Summary

This document outlines basic Spark concepts, focusing on Resilient Distributed Datasets (RDDs). It explains RDD characteristics, such as immutability and distributed storage across cluster nodes. The document details how RDDs are split into partitions and managed by the Spark framework for parallel processing. It also briefly touches upon Spark programming paradigms and the structure of Spark applications.

Full Transcript

RDDs are the primary abstraction in Spark RDDs are distributed collections of objects spread across the nodes of a clusters They are split in partitions Each node of the cluster that is running an application contains at least one partition of the RDD(s) that is (are) defined in the application...

RDDs are the primary abstraction in Spark RDDs are distributed collections of objects spread across the nodes of a clusters They are split in partitions Each node of the cluster that is running an application contains at least one partition of the RDD(s) that is (are) defined in the application RDDs Are stored in the main memory of the executors running in the nodes of the cluster (when it is possible) or in the local disk of the nodes if there is not enough main memory Allow executing in parallel the code invoked on them Each executor of a worker node runs the specified code on its partition of the RDD Example of an RDD split in 3 partitions Worker node Item 1 Item 1 Item 2 Item 2 Executor Item 3 Item 3 Item 4 Item 4 Item 5 Worker node Item 6 Item 5 Item 7 Item 6 Executor Item 7 Item 8 Item 8 Item 9 Worker node Item 10 Item 9 Item 11 Item 10 Executor Item 11 Item 12 Item 12 Example of an RDD split in 3 partitions Worker node Item 1 Item 1 Item 2 Item 2 Executor Item 3 Item 3 Item 4 Item 4 Item 5 Worker node more partitions Item 6 Item 5 Item 6 Executor = Item 7 Item 7 more parallelism Item 8 Item 8 Item 9 Worker node Item 10 Item 9 Item 11 Item 10 Executor Item 11 Item 12 Item 12 RDDs Are immutable once constructed i.e., the content of an RDD cannot be modified Spark tracks lineage information to efficiently recompute lost data (due to failures of some executors) i.e., for each RDD, Spark knows how it has been constructed and can rebuilt it if a failure occurs This information is represented by means of a DAG (Direct Acyclic Graph) connecting input data and RDDs 33 RDDs can be created by parallelizing existing collections of the hosting programming language (e.g., collections and lists of Scala, Java, Pyhton, or R) In this case the number of partition is specified by the user from (large) files stored in HDFS In this case there is one partition per HDFS block from files stored in many traditional file systems or databases by transforming an existing RDDs The number of partitions depends on the type of transformation 34 Spark programs are written in terms of operations on resilient distributed data sets Transformations Actions Spark Manages scheduling and synchronization of the jobs Manages the split of RDDs in partitions and cluster Hides complexities of fault-tolerance and slow machines RDDs are automatically rebuilt in case of machine failures Spark supports many programming languages Scala The same language that is used to develop the Spark framework and all its components (Spark Core, Sparl SQL, Spark Streaming, MLlib, GraphX) Java Python We will use Python R 39 The Driver program Contains the main method Accesses Spark through the SparkContext object The SparkContext object represents a connection to the cluster Defines Resilient Distributed Datasets (RDDs) that Invokes parallel operations on RDDs 40 The Driver program defines Local variables The standard variables of the Python programs RDDs The SparkContext object allows Creating RDDs parallel specific operations on RDDs Transformations and Actions 41 The worker nodes of the cluster are used to run your application by means of executors Each executor runs on its partition of the RDD(s) the operations that are specified in the driver 42 RDDs are distributed across Driver program executors (each RDD is split SparkContext in partitions that are spread across the available executors) Worker node Worker node Worker node Executor Executor Executor Cache Cache.. Cache Task Task Task Task Task Task HDFS, Amazon S3, or other file system 44 Spark programs can also be executed locally Local threads are used to parallelize the execution of the application on RDDs on a single PC - It is useful to develop and test the applications before deploying them on the cluster A local scheduler is launched to run Spark programs locally 45 Single PC Driver program SparkContext Executor Cache Executor Cache.. Executor Cache Task Task Task Task Task Task Local file system 46 Application User program built on Spark It consists of a driver program and executors on the cluster Driver program The process running the main() function of the application and creating the SparkContext Based on http://spark.apache.org/docs/latest/cluster-overview.html 47 Cluster manager An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN) Deploy mode Distinguishes where the driver process runs In "cluster" mode, the framework launches the driver inside of the cluster In "client" mode, the submitter launches the driver outside of the cluster Worker node Any node of the cluster that can run application code in the cluster 48 Executor A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them Each application has its own executors Task A unit of work that will be sent to one executor Job A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect) 49 Stage Each job gets divided into smaller sets of tasks called stages The output of one stage is the input of the next stage(s) Except the stages that compute (part of) the final result (i.e., the stages without output edges in the graph representing the workflow of the application) The outputs of those stages is stored in HDFS or a database The shuffle operation is always executed between two stages Data must be grouped/repartitioned based on a grouping criteria that is different with respect to the one used in the previous stage Similar to the shuffle operation between the map and the reduce phases in MapReduce Shuffle is a heavy operation 50 Count the number of lines of the input file Print the results on the standard output 52 from pyspark import SparkConf, SparkContext if __name__ == "__main__": #Create a configuration object and #set the name of the application conf = SparkConf().setAppName("Spark Line Count") # Create a Spark Context object sc = SparkContext(conf=conf) # Store the path of the input file in inputfile inputFile= "myfile.txt" 53 # Build an RDD of Strings from the input textual file # Each element of the RDD is a line of the input file linesRDD = sc.textFile(inputFile) # Count the number of lines in the input file # Store the returned value in the local variable numLines numLines = linesRDD.count() # Print the output in the standard output print("NumLines:", numLines) # Close the Spark Context object sc.stop() 54 from pyspark import SparkConf, SparkContext if __name__ == "__main__": #Create a configuration object and #set the name of the application Local Python variables. conf = SparkConf().setAppName("Spark They are allocatedLine in the Count") main memory of the same process instancing the Driver # Create a Spark Context object sc = SparkContext(conf=conf) # Store the path of the input file in inputfile inputFile= "myfile.txt" 55 # Build an RDD of Strings from the input textual file # Each element of the RDD is a line of the input file linesRDD = sc.textFile(inputFile) RDD. It is allocated/stored in the main memory # Count the number of lines in the or in theinput fileof the executors of the local disk worker nodes # Store the returned value in the local variable numLines numLines = linesRDD.count() # Print the output in the standard output print("NumLines:", numLines) Local Python variable. # Close the Spark Context objectin the main memory It is allocated sc.stop() of the same process instancing the Driver 56 Local variables The maximum size is equal to the main memory of the process associated with the Driver RDDs objects/data in the nodes of the cluster In the main memory of the worker nodes, when it is possible In the local disks of the worker nodes, when it is necessary 57 Word Count implemented by means of Spark The name of the input file is specified by using a command line parameter (i.e., argv) The output of the application (i.e., the pairs (word, num. of occurrences) is stored in an output folder (i.e., argv) Note: Do not worry about details 58 from pyspark import SparkConf, SparkContext import sys if __name__ == "__main__": """ Word count example """ inputFile= sys.argv outputPath = sys.argv #Create a configuration object and #set the name of the application conf = SparkConf().setAppName("Spark Word Count") # Create a Spark Context object sc = SparkContext(conf=conf) 59 # Build an RDD of Strings from the input textual file # Each element of the RDD is a line of the input file lines = sc.textFile(inputFile) # Split/transform the content of lines in a # list of words and store them in the words RDD words = lines.flatMap(lambda line: line.split(sep=' ')) #Map/transform each word in the words RDD #to a pair/tuple (word,1) and store the result in the words_one RDD words_one = words.map(lambda word: (word, 1)) 60 # Count the num. of occurrences of each word. # Reduce by key the pairs of the words_one RDD and store # the result (the list of pairs (word, num. of occurrences) # in the counts RDD counts = words_one.reduceByKey(lambda c1, c2: c1 + c2) # Store the result in the output folder counts.saveAsTextFile(outputPath) # Close/Stop the Spark Context object sc.stop() 61 is based on the Spark Context object In Python the name of the class is SparkContext The Spark Context is built by means of the constructor of the SparkContext class The only parameter is a configuration object 3 Example #Create a configuration object and #set the name of the application conf = SparkConf().setAppName("Application name") # Create a Spark Context object sc = SparkContext(conf=conf) 4 The Spark Context object can be obtained also by using the SparkContext.getOrCreate(conf) method The only parameter is a configuration object If the SparkContext object already exists for this application the current SparkContext object is returned Otherwise, a new SparkContext object is returned There is always one single SparkContext object for each application 5 Example #Create a configuration object and #set the name of the application conf = SparkConf().setAppName("Application name") # Retrieve the current SparkContext object or # create a new one sc = SparkContext.getOrCreate(conf=conf) 6 A Spark RDD is an immutable distributed collection of objects Each RDD is split in partitions This choice allows parallelizing the code based on RDDs Code is executed on each partition in isolation RDDs can contain any type of Scala, Java, and Python objects Including user-defined classes RDDs can be created By loading an external dataset (e.g., the content of a folder, a single file, a database table, etc.) By parallelizing a local collection of objects created in the Driver (e.g., a Java collection) 10 An RDD can be built from an input textual file It is based on the textFile(name) method of the SparkContext class The returned RDD is an RDD of Strings associated with the content of the name textual file Each line of the input file is associated with an object (a string) of the instantiated RDD By default, if the input file is an HDFS file the number of partitions of the created RDD is equal to the number of HDFS blocks used to store the file To support data locality 11 Example # Build an RDD of strings from the input textual file # myfile.txt # Each element of the RDD is a line of the input file inputFile = "myfile.txt" lines = sc.textFile(inputFile) 12 Example # Build an RDD of strings from the input textual file # myfile.txt # Each element of the RDD is a line of the input file inputFile = "myfile.txt" lines = sc.textFile(inputFile) No computation occurs when sc.textFile() is invoked Spark only records how to create the RDD The data is lazily read from the input file only when the data is needed (i.e., when an action is applied on lines, or on one 13 An RDD can be built from a folder containing textual files It is based on the textFile(name) method of the SparkContext class If name is the path of a folder all files inside that folder are considered The returned RDD contains one string for each line of the files contained on the name folder 14 Example # Build an RDD of strings from all the files stored in # myfolder # Each element of the RDD is a line of the input files inputFolder = "myfolder/" lines = sc.textFile(inputFolder) 15 Example # Build an RDD of strings from all the files stored in # myfolder # Each element of the RDD is a line of the input files inputFolder = "myfolder/" lines = sc.textFile(inputFolder) Pay attention that all files inside myfolder are considered. Also those without suffix or with a suffix different from.txt 16 The developer can manually set the (minimum) number of partitions In this case the textFile(name, minPartitions) method of the SparkContext class is used This option can be used to increase the parallelization of the submitted application For the HDFS files, the number of partitions minPartitions must be greater than the number of blocks/chunks 17 Example # Build an RDD of strings from the input textual file # myfile.txt # The number of partitions is manually set to 4 # Each element of the RDD is a line of the input file inputFile = lines = sc.textFile(inputFile, 4) 18 collection/list of local python objects It is based on the parallelize(c) method of the SparkContext class The created RDD is an RDD of objects of the same type of objects of the input python collection c In the created RDD, there is one object for each element of the input collection Spark tries to set the number of partitions automatically 19 Example # Create a local python list inputList = ['First element', 'Second element', 'Third element'] # Build an RDD of Strings from the local list. # The number of partitions is set automatically by Spark # There is one element of the RDD for each element # of the local list distRDDList = sc.parallelize(inputList) 20 Example # Create a local python list inputList = ['First element', 'Second element', 'Third element'] No computation occurs when sc.parallelize() is invoked # Build an RDD of Strings from the local list. Spark only records how to create the RDD # TheThenumber of partitions data is lazily isinput read from the set automatically by Spark Python list only when the data # There is one is needed (i.e.,element of the when an action RDD for is applied each element on distRDDList or on one of # of the local list distRDDList = sc.parallelize(inputList) 21 When the parallelize(c) is invoked Spark tries to set the number of partitions characteristics The developer can set the number of partition by using the method parallelize(c, numSlices) of the SparkContext class 22 Example # Create a local python list inputList = ['First element', 'Second element', 'Third element'] # Build an RDD of Strings from the local list. # The number of partitions is set to 3 # There is one element of the RDD for each element # of the local list distRDDList = sc.parallelize(inputList, 3) 23 An RDD can be easily stored in textual (HDFS) files It is based on the saveAsTextFile(path) method of the RDD class path is the path of a folder The method is invoked on the RDD that we want to store in the output folder Each object of the RDD on which the saveAsTextFile method is invoked is stored in one line of the output files stored in the output folder There is one output file for each partition of the input RDD 24 Example # Store the content of linesRDD in the output folder # Each element of the RDD is stored in one line # of the textual files of the output folder outputPath="risFolder/" linesRDD.saveAsTextFile(outputPath); 25 Example # Store the content of linesRDD in the output folder # Each element of the RDD is stored in one line # of the textual files of the output folder outputPath="risFolder/" linesRDD.saveAsTextFile(outputPath); saveAsTextFile() is an action. Hence Spark computes the content associated with linesRDD when saveAsTextFile() is invoked. Spark computes the content of an RDD only when that content is needed. 26 Example # Store the content of linesRDD in the output folder # Each element of the RDD is stored in one line # of the textual files of the output folder outputPath="risFolder/" linesRDD.saveAsTextFile(outputPath); Note that the output folder contains one textual file for each partition of linesRDD. Each output file contains the elements of one partition. 27 The content of an RDD can be retrieved from local python variable of the Driver It is based on the collect() method of the RDD class 28 The collect() method of the RDD class Returns a local python list of objects containing the same objects of the considered RDD Pay attention to the size of the RDD Large RDD cannot be stored in a local variable of the Driver 29 Example # Retrieve the content of the linesRDD and store it # in a local python list # The local python list contains a copy of each # element of linesRDD contentOfLines=linesRDD.collect(); 30 Example # Retrieve the content of the linesRDD and store it # in a local python list # The local python list contains a copy of each # element of linesRDD contentOfLines=linesRDD.collect(); Local python variable. RDD of strings. It is allocated in the main memory It is distributed across of the Driver process/task the nodes of the cluster 31 RDD support two types of operations Transformations Actions 33 Transformations Are operations on RDDs that return a new RDD Apply a transformation on the elements of the input RDD(s) and the result of the transformation Remember that RDDs are immutable Hence, you cannot change the content of an already existing RDD You can only apply a transformation on the content of an RDD 34 Transformations Are computed lazily when an action is applied on the RDDs generated by the transformation operations When a transformation is invoked Spark keeps only track of the dependency between the input RDD and the new RDD returned by the transformation The content of the new RDD is not computed 35 The graph of dependencies between RDDs represents the information about which RDDs are used to create a new RDD This is called lineage graph It is represented as a DAG (Directed Acyclic Graph) It is needed to compute the content of an RDD the first time an action is invoked on it Or to compute again the content of an RDD (or some of its partitions) when failures occur 36 The lineage graph is also useful for optimization purposes When the content of an RDD is needed, Spark can consider the chain of transformations that are applied to compute the content of the needed RDD and potentially decide how to execute the chain of transformations Spark can potentially change the order of some transformations or merge some of them based on its optimization engine 37 Actions Are operations that Return results to the Driver program i.e., return local (python) variables Pay attention to the size of the returned results because they must be stored in the main memory of the Driver program Or write the result in the storage (output file/folder) The size of the result can be large in this case since it is directly stored in the (distributed) file system 38 Consider the following code from pyspark import SparkConf, SparkContext import sys if __name__ == "__main__": conf = SparkConf().setAppName("SparkApplication") sc = SparkContext(conf=conf) # Read the content of a log file inputRDD = sc.textFile("log.txt") 39 # Select the rows containing error errorsRDD = inputRDD.filter(lambda line: line.find('error')>=0) # Select the rows containing warning warningRDD = inputRDD.filter(lambda line: line.find('warning')>=0) # Union of errorsRDD and warningRDD # The result is associated with a new RDD: badLinesRDD badLinesRDD = errorsRDD.union(warningRDD) 40 # Remove duplicates lines (i.e., those lines containing # both error warning uniqueBadLinesRDD = badLinesRDD.distinct() # Count the number of bad lines by applying # the count() action numBadLines = uniqueBadLinesRDD.count() # Print the result on the standard output of the driver print("Lines with problems:", numBadLines) 41 inputRDD filter filter errorsRDD warningsRDD union badLinesRDD distinct uniqueBadLinesRDD 42 The application reads the input log file only when the count() action is invoked It is the first action of the program filter(), union(), and distinct() are transformations They are computed lazily Also textFile() is computed lazily However, it is not a transformation because it is not applied on an RDD 43 Spark, similarly to an SQL optimizer, can transformations For instance, in this case the two filters + union + distinct can be potentially optimized and transformed in one single filter applying the constraint This optimization improves the efficiency of the application Spark can performs this kind of optimizations only on particular types of RDDs: Datasets and DataFrames 44 Many transformations (and some actions) are based on user provided functions that specify For example the filter() transformation selects the elements of an RDD satisfying a user specified constraint The user specified constraint is a Boolean function 46 Each language has its own solution to pass actions In python, we can use Lambda functions/expressions Simple functions that can be written as one single expression defs For multi-statement functions or statements that do not return a value 47 Create an RDD from a log file Create a new RDD containing only the lines of The filter() transformation applies the filter constraint on each element of the input RDD The filter constraint is specified by means of a Boolean function that returns true for the elements satisfying the constraint and false for the others 48 # Read the content of a log file inputRDD = sc.textFile("log.txt") # Select the rows containing error errorsRDD = inputRDD.filter(lambda l: l.find('error')>=0) 49 This part of the code, which is based on a lambda expression, defines on the fly # Read the content of a log file the function that we want to apply. This part of the code is applied on each inputRDD = sc.textFile("log.txt") object of inputRDD new errorsRDD RDD. Otherwise the input object is discarded # Select the rows containing error errorsRDD = inputRDD.filter(lambda l: l.find('error')>=0) 50 # Define the content of the Boolean function that is applied # to select the elements of interest def myFunction(l): if l.find('error')>=0: return True else: return False # Read the content of a log file inputRDD = sc.textFile("log.txt") # Select the rows containing error errorsRDD = inputRDD.filter(myFunction) 51 # Define the content of the Boolean function that is applied # to select the elements of interest def myFunction(l): if l.find('error')>=0: return True else: return False #When Readit isthe content invoked, of a log this function file the value of the parameter line analyzes inputRDD = sc.textFile("log.txt") it returns false. # Select the rows containing error errorsRDD = inputRDD.filter(myFunction) 52 # Define the content of the Boolean function that is applied # to select the elements of interest def myFunction(l): if l.find('error')>=0: return True else: return False #Apply Read thethe content filter() of a log transformation file on inputRDD. inputRDD = sc.textFile("log.txt") The filter transformation selects the elements of inputRDD satisfying the constraint specified in myFunction. # Select the rows containing error errorsRDD = inputRDD.filter(myFunction) 53 # Define the content of the Boolean function that is applied # to select the elements of interest def myFunction(l): if l.find('error')>=0: return True else: return False #ForRead the content each object of athe o in inputRDD logmyFunction file function is automatically invoked. inputRDD If myFunction= sc.textFile("log.txt") errorsRDD. Otherwise o is discarded # Select the rows containing error errorsRDD = inputRDD.filter(myFunction) 54 # Define the content of the Boolean function that is applied # to select the elements of interest def myFunction(l): return l.find('error')>=0 #This Readpart the of thecontent code is theofsame a log file used in the lambda-based inputRDD version. = sc.textFile("log.txt") # Select the rows containing error errorsRDD = inputRDD.filter(myFunction) 55 The two solutions are more or less equivalent in terms of efficiency Lambda function-based code More concise More readable But multi-statement functions or statements that do not return a value are not supported defs Multi-statement functions or statements that do not return a value are supported Code can be reused Some functions are used in several applications 56 Some basic transformations analyze the content of one single RDD and return a new RDD E.g., filter(), map(), flatMap(), distinct(), sample() Some other transformations analyze the content of two (input) RDDs and return a new RDD E.g., union(), intersection(), substract(), cartesian() 58 Goal The filter transformation is applied on one single RDD and returns a new RDD containing only the specified condition 60 Method The filter transformation is based on the filter(f) method of the RDD class A function f returning a Boolean value is passed to the filter method It contains the code associated with the condition that we want to apply on each element e If the condition is satisfied then the call method returns true and the input element e is selected Otherwise, it returns false and the e element is discarded 61 Create an RDD from a log file Create a new RDD containing only the lines of 62 # Read the content of a log file inputRDD = sc.textFile("log.txt") errorsRDD = inputRDD.filter(lambda e: e.find('error')>=0) 63 # Read the content of a log file inputRDD = sc.textFile("log.txt") errorsRDD = inputRDD.filter(lambda e: e.find('error')>=0) We are working with an input RDD containing strings. Hence, the implemented lambda function is applied on one string at a time and returns a Boolean value 64 Create an RDD of integers containing the values [1, 2, 3, 3] Create a new RDD containing only the values greater than 2 65 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList); # Select the values greater than 2 greaterRDD = inputRDD.filter(lambda num : num>2) 66 We are working with an input RDD of integers. Hence, the implemented lambda function is applied on one integer at a time and returns a Boolean value # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList); # Select the values greater than 2 greaterRDD = inputRDD.filter(lambda num : num>2) 67 # Define the function to be applied in the filter transformation def greaterThan2(num): return num>2 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList); # Select the values greater than 2 greaterRDD = inputRDD.filter(greaterThan2) 68 # Define the function to be applied in the filter transformation def greaterThan2(num): return num>2 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD The function we want to apply is inputList = [1, 2, 3, 3] defined by using def and then is passed inputRDD = sc.parallelize(inputList);to the filter transformation # Select the values greater than 2 greaterRDD = inputRDD.filter(greaterThan2) 69 Goal The map transformation is used to create a new RDD by applying a function f on each element of the The new RDD contains exactly one element y for each element x The value of y is obtained by applying a user defined function f on x y= f(x) The data type of y can be different from the data type of x 71 Method The map transformation is based on the RDD map(f) method of the RDD class A function f implementing the transformation is passed to the map method It contains the code that is applied over each element of RDD For each input element exactly one single new element is returned by f 72 Create an RDD from a textual file containing the surnames of a list of users Each line of the file contains one surname Create a new RDD containing the length of each surname 73 # Read the content of the input textual file inputRDD = sc.textFile("usernames.txt") # Compute the lengths of the input surnames lenghtsRDD = inputRDD.map(lambda line: len(line)) 74 The input RDD is an RDD of strings. Hence also the input of the lambda function is a String # Read the content of the input textual file inputRDD = sc.textFile("usernames.txt") # Compute the lengths of the input surnames lenghtsRDD = inputRDD.map(lambda line: len(line)) 75 The new RDD is an RDD of Integers. The lambda function returns a new Integer for each input element # Read the content of the input textual file inputRDD = sc.textFile("usernames.txt") # Compute the lengths of the input surnames lenghtsRDD = inputRDD.map(lambda line: len(line)) 76 Create an RDD of integers containing the values [1, 2, 3, 3] Create a new RDD containing the square of each input element 77 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList) # Compute the square of each input element squaresRDD = inputRDD.map(lambda element: element*element) 78 Goal The flatMap transformation is used to create a new RDD by applying a function f on each element of the The new RDD contains a list of elements obtained by applying f on each element x The function f applied on an element x RDD returns a list of values [y] [y]= f(x) [y] can be the empty list 80 The final result is the concatenation of the list of values obtained by applying f over all the lists obtained by applying f over all the elements of the input RDD Duplicates are not removed The data type of y can be different from the data type of x 81 Method The flatMap transformation is based on the flatMap(f) method of the RDD class A function f implementing the transformation is passed to the flatMap method It contains the code that is applied on each element of be included in the new returned RDD For each element of the RDD a list of new elements is returned by f The returned list can be empty 82 Create an RDD from a textual file containing a generic text Each line of the input file can contain many words Create a new RDD containing the list of words, with repetitions, occurring in the input textual document Each element of the returned RDD is one of the words occurring in the input textual file The words occurring multiple times in the input file appear multiple times, as distinct elements, also in the returned RDD 83 # Read the content of the input textual file inputRDD = sc.textFile("document.txt") # Compute/identify the list of words occurring in document.txt listOfWordsRDD = inputRDD.flatMap(lambda l: l.split(' ')) 84 # Read the content of the input textual file inputRDD = sc.textFile("document.txt") # Compute/identify the list of words occurring in document.txt listOfWordsRDD = inputRDD.flatMap(lambda l: l.split(' ')) values for each input element 85 # Read the content of the input textual file inputRDD = sc.textFile("document.txt") # Compute/identify the list of words occurring in document.txt listOfWordsRDD = inputRDD.flatMap(lambda l: l.split(' ')) lists obtained by applying the lambda function over all the elements of inputRDD 86 # Read the content of the input textual file inputRDD = sc.textFile("document.txt") # Compute/identify the list of words occurring in document.txt listOfWordsRDD = inputRDD.flatMap(lambda l: l.split(' ')) The new RDD is an RDD of strings and not an RDD of lists of strings 87 Goal The distinct transformation is applied on one single RDD and returns a new RDD containing the RDD Method The distinct transformation is based on the distinct() method of the RDD class No functions are needed in this case 89 Shuffle A shuffle operation is executed for computing the result of the distinct transformation Data from different input partitions must be compared to remove duplicates The shuffle operation is used to repartition the input data All the repetitions of the same input element are associated with the same output partition (in which one single copy of the element is stored) A hash function assigns each input element to one of the new partitions 90 Create an RDD from a textual file containing the names of a list of users Each line of the input file contains one name Create a new RDD containing the list of distinct names occurring in the input file The type of the new RDD is the same of the 91 # Read the content of a textual input file inputRDD = sc.textFile("names.txt") # Select the distinct names occurring in inputRDD distinctNamesRDD = inputRDD.distinct() 92 Create an RDD of integers containing the values [1, 2, 3, 3] Create a new RDD containing only the 93 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList) # Compute the set of distinct words occurring in inputRDD distinctIntRDD = inputRDD.distinct() 94 Goal The sortBy transformation is applied on one RDD and returns a new RDD containing the same content of the input RDD sorted in ascending order Method The sortBy transformation is based on the sortBy(keyfunc) method of the RDD class Each element of the input RDD is initially mapped to a new value by applying the specified function keyfunc The input elements are sorted by considering the values returned by the invocation of keyfunc on the input values 96 The sortBy(keyfunc, ascending) method of the RDD class allows specifying if the values in the returned RDD are sorted in ascending or descending order by using the Boolean parameter ascending ascending set to True = ascending ascending set to False = descending 97 Create an RDD from a textual file containing the names of a list of users Each line of the input file contains one name Create a new RDD containing the list of users sorted by name (based on the alphabetic order) 98 # Read the content of a textual input file inputRDD = sc.textFile("names.txt") # Sort the content of the input RDD by name. # Store the sorted result in a new RDD sortedNamesRDD = inputRDD.sortBy(lambda name: name) 99 # Read the content of a textual input file inputRDD = sc.textFile("names.txt") # Sort the content of the input RDD by name. # Store the sorted result in a new RDD sortedNamesRDD = inputRDD.sortBy(lambda name: name) Each input element is a string. We are interested in sorting the input names (strings) in alphabetic order, which is the standard sort order for strings. For this reason the lambda function returns the input strings without modifying them. 100 Create an RDD from a textual file containing the names of a list of users Each line of the input file contains one name Create a new RDD containing the list of users sorted by the length of their name (i.e., the sort order is based on len(name)) 101 # Read the content of a textual input file inputRDD = sc.textFile("names.txt") # Sort the content of the input RDD by name. # Store the sorted result in a new RDD sortedNamesLenRDD = inputRDD.sortBy(lambda name: len(name)) 102 # Read the content of a textual input file inputRDD = sc.textFile("names.txt") # Sort the content of the input RDD by name. # Store the sorted result in a new RDD sortedNamesLenRDD = inputRDD.sortBy(lambda name: len(name)) Each input element is a string but we are interested in sorting the input names (strings) by length (integer), which is not the standard sort order for strings. For this reason the lambda function returns the length of each input string. The sort operation is performed on the returned integer values (the lengths of the input names). 103 Goal The sample transformation is applied on one single RDD and returns a new RDD containing a random Method The sample transformation is based on the sample(withReplacement, fraction) method of RDD class withReplacement specifies if the random sample is with replacement (true) or not (false) fraction specifies the expected size of the sample as a fraction 105 Create an RDD from a textual file containing a set of sentences Each line of the file contains one sentence Create a new RDD containing a random sample of sentences Set fraction to 0.2 (i.e., 20%) 106 # Read the content of a textual input file inputRDD = sc.textFile("sentences.txt") # Create a random sample of sentences randomSentencesRDD = inputRDD.sample(False,0.2) 107 Create an RDD of integers containing the values [1, 2, 3, 3] Create a new RDD containing a random sample of the input values Set fraction to 0.2 108 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList) # Create a sample of the inputRDD randomSentencesRDD = inputRDD.sample(True,0.2) 109 Spark provides also a set of transformations that operate on two input RDDs and return a new RDD Some of them implement standard set transformations Union Intersection Subtract Cartesian 111 All these transformations have Two input RDDs One is the RDD on which the method is invoked The other RDD is passed as parameter to the method One output RDD All the involved RDDs have the same data type when union, intersection, or subtract are used cartesian transformation 112 The union transformation is based on the union(other) method of the RDD class other is the second RDD we want to use It returns a new RDD containing the union (with duplicates) of the elements of the two input RDDs Duplicates elements are not removed This choice is related to optimization reasons Removing duplicates means having a global view of the whole content of the two input RDDs Since each RDD is split in partitions that are stored in different nodes remove duplicates Computational costly operation The shuffle operation is not needed in this case 113 If you really need to union two RDDs and remove duplicates you can apply the distinct() transformation on the output of the union() transformation But pay attention that distinct() is a computational costly operation It is associated with a shuffle operation Use distinct() if and only if duplicate removal is indispensable for your application 114 The intersection transformation is based on the intersection(other) method of the RDD class other is the second RDD we want to use It returns a new RDD containing the elements (without duplicates) of the elements occurring in both input RDDs A shuffle operation is executed for computing the result of intersection Elements from different input partitions must be compared to find common elements 115 The subtract transformation is based on the subtract(other) method of the RDD class other is the second RDD we want to use The result contains the elements appearing only in the RDD on which the subtract method is invoked In this transformation the two input RDDs play different roles Duplicates are not removed A shuffle operation is executed for computing the result of subtract Elements from different input partitions must be compared 116 The cartesian transformation is based on the cartesian(other) method of the RDD class RDDs can be different The returned RDD is an RDD of pairs (tuples) containing all the combinations composed of one element of the first input RDD and one element of the second input RDD We will see later what an RDD of pairs is 117 A large amount of data is sent on the network Elements from different input partitions must be combined to compute the returned pairs The elements of the two input RDDs are stored in different partitions, which could be in different servers 118 Create two RDDs of integers inputRDD1 contains the values [1, 2, 2, 3, 3] inputRDD2 contains the values [3, 4, 5] Create three new RDDs outputUnionRDD contains the union of inputRDD1 and inputRDD2 outputIntersectionRDD contains the intersection of inputRDD1 and inputRDD2 outputSubtractRDD contains the result of inputRDD1 \ inputRDD2 119 # Create two RDD of integers inputList1 = [1, 2, 2, 3, 3] inputRDD1 = sc.parallelize(inputList1) inputList2 = [3, 4, 5] inputRDD2 = sc.parallelize(inputList2) # Create three new RDDs by using union, intersection, and subtract outputUnionRDD = inputRDD1.union(inputRDD2) outputIntersectionRDD = inputRDD1.intersection(inputRDD2) outputSubtractRDD = inputRDD1.subtract(inputRDD2) 120 Create two RDDs of integers inputRDD1 contains the values [1, 2, 2, 3, 3] inputRDD2 contains the values [3, 4, 5] Create a new RDD containing the cartesian product of inputRDD1 and inputRDD2 121 # Create two RDD of integers inputList1 = [1, 2, 2, 3, 3] inputRDD1 = sc.parallelize(inputList1) inputList2 = [3, 4, 5] inputRDD2 = sc.parallelize(inputList2) # Compute the cartesian product outputCartesianRDD = inputRDD1.cartesian(inputRDD2) 122 # Create two RDD of integers inputList1 = [1, 2, 2, 3, 3] inputRDD1 = sc.parallelize(inputList1) inputList2 = [3, 4, 5] inputRDD2 = sc.parallelize(inputList2) # Compute the cartesian product outputCartesianRDD = inputRDD1.cartesian(inputRDD2) Each element of the returned RDD is a pair (tuple) of integer elements 123 Create two RDDs inputRDD1 contains the Integer values [1, 2, 3] inputRDD2 contains the String values ["A", "B"] Create a new RDD containing the cartesian product of inputRDD1 and inputRDD2 124 # Create an RDD of Integers and an RDD of Strings inputList1 = [1, 2, 3] inputRDD1 = sc.parallelize(inputList1) inputList2 = ["A", "B"] inputRDD2 = sc.parallelize(inputList2) # Compute the cartesian product outputCartesianRDD = inputRDD1.cartesian(inputRDD2) 125 # Create an RDD of Integers and an RDD of Strings inputList1 = [1, 2, 3] inputRDD1 = sc.parallelize(inputList1) inputList2 = ["A", "B"] inputRDD2 = sc.parallelize(inputList2) # Compute the cartesian product outputCartesianRDD = inputRDD1.cartesian(inputRDD2) Each element of the returned RDD is a pair (tuple) containing an integer and string 126 All the examples reported in the following tables are applied on an RDD of integers containing the following elements (i.e., values) [1, 2, 3, 3] 128 Transformation Purpose Example of applied Result function filter(f) Return an RDD consisting filter(lambda x: x != 1) [2,3,3] only of the elements of the condition passed to filter(). RDD have the same data type. map(f) Apply a function to each map(lambda x: x+1) [2,3,4,4] element in the RDD and return an RDD of the result. For each input The applied function return element x, the one element for each element with value x+1 is included in the new RDD RDD can have a different data type. 129 Transformation Purpose Example of applied Result function flatMap(f) Apply a function to each flatMap(lambda x: [1,2,3,2, element in the RDD and list(range(x,4)) 3,3,3] return an RDD of the result. The applied function return a For each input set of elements (from 0 to element x, the set of many) for each element of elements with values from x to 3 are returned RDD can have a different data type. 130 Transformation Purpose Example of applied Result function distinct() Remove duplicates distinct() [1, 2, 3] sortBy(keyfunc) Return a new RDD sortBy(lambda v: v) [1, 2, 3, 3] containing the same values of the input RDD sorted in Sort the input integer ascending order values in ascending order by using the standard integer sort order sample(withReplacement, Sample the content of the sample(True, 0.2) Non fraction) RDD, with or determini without replacement and stic return the selected sample. new RDD have the same data type. 131 All the examples reported in the following tables are applied on the following two RDDs of integers inputRDD1 [1, 2, 2, 3, 3] inputRDD2 [3, 4, 5] 132 Transformation Purpose Example Result union(other) Return a new RDD containing inputRDD1.union [1, 2, 2, 3, the union of the elements of (inputRDD2) 3, 3, 4, 5] elements of the one passed as parameter to union(). Duplicate values are not removed. All the RDDs have the same data type. intersection(other) Return a new RDD containing inputRDD1.intersection the intersection of the (inputRDD2) elements and the elements of the one passed as parameter to intersection(). All the RDDs have the same data type. 133 Transformation Purpose Example Result subtract(other) Return a new RDD the inputRDD1.subtract [1, 2, 2] elements appearing only in (inputRDD2) the one passed as parameter to subtract(). All the RDDs have the same data type. cartesian(other) Return a new RDD containing inputRDD1.cartesian(in [(1, 3), (1, the cartesian product of the putRDD2) 4), elements and the elements of the one (3,5)] passed as parameter to cartesian(). All the RDDs have the same data type. 134 Spark actions can retrieve the content of an RDD or the result of a function applied on an RDD and Python variable of the Driver program Pay attention to the size of the returned value Pay attentions that date are sent on the network from the nodes containing the content of RDDs and the executor running the Driver Or store the content of an RDD in an output folder or database 136 The spark actions that return a result that is stored in local (Python) variables of the Driver 1. Are executed locally on each node containing partitions of the RDD on which the action is invoked Local results are generated in each node 2. Local results are sent on the network to the Driver that computes the final result and store it in local variables of the Driver The basic actions returning (Python) objects to the Driver are collect(), count(), countByValue(), take(), top(), takeSample(), reduce(), fold(), aggregate(), foreach() 137 Goal The collect action returns a local Python list of objects containing the same objects of the considered RDD Pay attention to the size of the RDD Large RDD cannot be memorized in a local variable of the Driver Method The collect action is based on the collect() method of the RDD class 139 Create an RDD of integers containing the values [1, 2, 3, 3] Retrieve the values of the created RDD and store them in a local python list that is instantiated in the Driver 140 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList) # Retrieve the elements of the inputRDD and store them in # a local python list retrievedValues = inputRDD.collect() 141 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList) # Retrieve the elements of the inputRDD and store them in # a local python list retrievedValues = inputRDD.collect() inputRDD is distributed across the nodes of the cluster. It can be large and it is stored in the local disks of the nodes if it is needed 142 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputList = [1, 2, 3, 3] inputRDD = sc.parallelize(inputList) # Retrieve the elements of the inputRDD and store them in # a local python list retrievedValues = inputRDD.collect() retrievedValues is a local python variable. It can only be stored in the main memory of the process/task associated with the Driver. Pay attention to the size of the list. Use the collect() action if and only if you are sure that the list is small. Otherwise, store the content of the RDD in a file by using the saveAsTextFile method 143 Goal Count the number of elements of an RDD Method The count action is based on the count() method of the RDD class It returns the number of elements of the input RDD 145 Print the name of the file with more lines 146 # Read the content of the two input textual files inputRDD1 = sc.textFile("document1.txt") inputRDD2 = sc.textFile("document2.txt") # Count the number of lines of the two files = number of elements # of the two RDDs numLinesDoc1 = inputRDD1.count() numLinesDoc2 = inputRDD2.count() if numLinesDoc1> numLinesDoc2: print("document1.txt") elif numLinesDoc2> numLinesDoc1: print("document2.txt") else: print("Same number of lines") 147 Goal The countByValue action returns a local python dictionary containing the information about the number of times each element occurs in the RDD The keys of the dictionary are associated with the input elements The values are the frequencies of the elements Method The countByValue action is based on the countByValue() method of the RDD class The amount of used main memory in the Driver is related to the number of distinct elements/keys 149 Create an RDD from a textual file containing the first names of a list of users Each line contain one name Compute the number of occurrences of each variable of the Driver 150 # Read the content of the input textual file namesRDD = sc.textFile("names.txt") # Compute the number of occurrencies of each name namesOccurrences = namesRDD.countByValue() 151 # Read the content of the input textual file namesRDD = sc.textFile("names.txt") # Compute the number of occurrencies of each name namesOccurrences = namesRDD.countByValue() Also in this case, pay attention to the size of the returned dictionary (that is related to the number of distinct names in this case). Use the countByValue() action if and only if you are sure that the returned dictionary is small. transformations and write the final result in a file by using the saveAsTextFile method. 152 Goal The take(num) action returns a local python list of objects containing the first num elements of the considered RDD The order of the elements in an RDD is consistent with the order of the elements in the file or collection that has been used to create the RDD Method The take action is based on the take(num) method of the RDD class 154 Create an RDD of integers containing the values [1, 5, 3, 3, 2] Retrieve the first two values of the created RDD and store them in a local python list that is instantiated in the Driver 155 # Create an RDD of integers. Load the values 1, 5, 3, 3,2 in this RDD inputList = [1, 5, 3, 3, 2] inputRDD = sc.parallelize(inputList) # Retrieve the first two elements of the inputRDD and store them in # a local python list retrievedValues = inputRDD.take(2) 156 Goal The first() action returns a local python object containing the first element of the considered RDD The order of the elements in an RDD is consistent with the order of the elements in the file or collection that has been used to create the RDD Method The first action is based on the first() method of the RDD class 158 The only difference between first() and take(1) is given by the fact that first() returns a single element The returned element is the first element of the RDD take(1) returns a list of elements containing one single element The only element of the returned list is the first element of the RDD 159 Goal The top(num) action returns a local python list of objects containing the top num (largest) elements of the considered RDD The ordering is the default one of class associated with the objects stored in the RDD The descending order is used Method The top action is based on the top(num) method of the RDD class 161 Create an RDD of integers containing the values [1, 5, 3, 4, 2] Retrieve the top-2 greatest values of the created RDD and store them in a local python list that is instantiated in the Driver 162 # Create an RDD of integers. Load the values 1, 5, 3, 4,2 in this RDD inputList = [1, 5, 3, 4, 2] inputRDD = sc.parallelize(inputList) # Retrieve the top-2 elements of the inputRDD and store them in # a local python list retrievedValues = inputRDD.top(2) 163 Goal The top(num, key) action returns a local python list of objects containing the num largest elements of the considered RDD sorted by considering a user specified Method The top action is based on the top(num, key) method of the RDD class num is the number of elements to be selected key is a function that is applied on each input element before comparing them The comparison between elements is based on the values returned by the invocations of this function 164 Create an RDD of strings containing the values ['Paolo', 'Giovanni', 'Luca'] Retrieve the 2 longest names (longest strings) of the created RDD and store them in a local python list that is instantiated in the Driver 165 # Create an RDD of strings. Load the values 'Paolo', 'Giovanni', 'Luca'] # in the RDD inputList = ['Paolo', 'Giovanni', 'Luca'] inputRDD = sc.parallelize(inputList) # Retrieve the 2 longest names of the inputRDD and store them in # a local python list retrievedValues = inputRDD.top(2,lambda s:len(s)) 166 Goal The takeOrdered(num) action returns a local python list of objects containing the num smallest elements of the considered RDD The ordering is the default one of class associated with the objects stored in the RDD The ascending order is used Method The takeOrdered action is based on the takeOrdered (num) method of the RDD class 168 Create an RDD of integers containing the values [1, 5, 3, 4, 2] Retrieve the 2 smallest values of the created RDD and store them in a local python list that is instantiated in the Driver 169 # Create an RDD of integers. Load the values 1, 5, 3, 4,2 in this RDD inputList = [1, 5, 3, 4, 2] inputRDD = sc.parallelize(inputList) # Retrieve the 2 smallest elements of the inputRDD and store them in # a local python list retrievedValues = inputRDD.takeOrdered(2) 170 Goal The takeOrdered(num, key) action returns a local python list of objects containing the num smallest elements of the considered RDD sorted by Method The takeOrdered action is based on the takeOrdered (num, key) method of the RDD class num is the number of elements to be selected key is a function that is applied on each input element before comparing them The comparison between elements is based on the values returned by the invocations of this function 171 Create an RDD of strings containing the values ['Paolo', 'Giovanni', 'Luca'] Retrieve the 2 shortest names (shortest strings) of the created RDD and store them in a local python list that is instantiated in the Driver 172 # Create an RDD of strings. Load the values 'Paolo', 'Giovanni', 'Luca'] # in the RDD inputList = ['Paolo', 'Giovanni', 'Luca'] inputRDD = sc.parallelize(inputList) # Retrieve the 2 shortest names of the inputRDD and store them in # a local python list retrievedValues = inputRDD.takeOrdered(2,lambda s:len(s)) 173 Goal The takeSample(withReplacement, num) action returns a local python list of objects containing num random elements of the considered RDD Method The takeSampleaction is based on the takeSample(withReplacement, num) method of the RDD class withReplacement specifies if the random sample is with replacement (True) or not (False) 175 Method The takeSample(withReplacement, num, seed) method of the RDD class is used when we want to set the seed 176 Create an RDD of integers containing the values [1, 5, 3, 3, 2] Retrieve randomly, without replacement, 2 values from the created RDD and store them in a local python list that is instantiated in the Driver 177 # Create an RDD of integers. Load the values 1, 5, 3, 3,2 in this RDD inputList = [1, 5, 3, 3, 2] inputRDD = sc.parallelize(inputList) # Retrieve randomly two elements of the inputRDD and store them in # a local python list randomValues= inputRDD.takeSample(True, 2) 178 Goal Return a single python object obtained by combining all the objects of the input RDD by associative and commutative otherwise the result depends on the content of the partitions and The all instances of the same data type/class 180 Method The reduce action is based on the reduce(f) method of the RDD class A function f is passed to the reduce method Given two arbitrary input elements, f is used to combine them in one single value f is recursively invoked over the elements of the input value 181 Suppose L contains the list of elements of the To compute the final element/value, the reduce action operates as follows 1. elements e1 and e2 occurring in L and obtain a new element enew 2. e1 and e2 from L and then insert the element enew in L 3. If L contains only one value then return it as final result of the reduce action. Otherwise, return to step 1 182 f must be associative and commutative The computation of the reduce action can be performed in parallel without problems 183 commutative The computation of the reduce action can be performed in parallel without problems Otherwise the result depends on how the input RDD is partitioned i.e., for the functions that are not associative and commutative the output depends on how the RDD is split in partitions and how the content of each partition is analyzed 184 Create an RDD of integers containing the values [1, 2, 3, 3] Compute the sum of the values occurring in python integer variable in the Driver 185 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListReduce = [1, 2, 3, 3] inputRDDReduce = sc.parallelize(inputListReduce) # Compute the sum of the values sumValues = inputRDDReduce.reduce(lambda e1, e2: e1+e2) 186 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListReduce = [1, 2, 3, 3] inputRDDReduce = sc.parallelize(inputListReduce) # Compute the sum of the values sumValues = inputRDDReduce.reduce(lambda e1, e2: e1+e2) This lambda function combines two input integer elements at a time and returns theirs sum 187 Create an RDD of integers containing the values [1, 2, 3, 3] Compute the maximum value occurring in the python integer variable in the Driver 188 # Define the function for the reduce action def computeMax(v1,v2): if v1>v2: return v1 else: return v2 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListReduce = [1, 2, 3, 3] inputRDDReduce = sc.parallelize(inputListReduce) # Compute the maximum value maxValue = inputRDDReduce.reduce(computeMax) 189 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListReduce = [1, 2, 3, 3] inputRDDReduce = sc.parallelize(inputListReduce) # Compute the maximum value maxValue = inputRDDReduce.reduce(lambda e1, e2: max(e1, e2)) 190 Goal Return a single python object obtained by combining all the objects of the input RDD and a Must be associative Otherwise the result depends on how the RDD is partitioned It is not required to be commutative An initial 192 Method The fold action is based on the fold(zeroValue, op) method of the RDD class A function op is passed to the fold method Given two arbitrary input elements, op is used to combine them in one single value op op is recursively invoked over the elements of the input RDD until The is the neutral value for the used function op v by using op is equal to v 193 Create an RDD of strings containing the values ['This ', 'is ', 'a ', 'test'] Compute the concatenation of the values occurring in the RDD (from left to right) and python string variable in the Driver 194 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListFold = ['This ', 'is ', 'a ', 'test'] inputRDDFold = sc.parallelize(inputListFold) # Concatenate the input strings finalString = inputRDDFold.fold('', lambda s1, s2: s1+s2) 195 Fold value Fold can be used to parallelize functions that are associative but non-commutative E.g., concatenation of a list of strings 196 Goal Return a single python object obtained by combining the objects of the RDD and an initial associative Otherwise the result depends on how the RDD is partitioned The returned objects and the can be instances of different classes This is the main difference with respect to reduce () and fold( ) 198 Method The aggregate action is based on the aggregate(zeroValue, seqOp, combOp) method of the RDD class returned object is of type U (T!=U) with an element of type U to return a new element of type U It is used to merge the elements of the input RDD and the accumulator of each partition to return a new element of type U It is used to merge two elements of type U obtained as partial results generated by two different partitions 199 The seqOp function contains the code that is applied to combine the accumulator value (one accumulator for each partition) with the elements of each partition applying seqOp The combOp function contains the code that is applied to combine two elements of type U returned as partial results by two different partitions The global final result is computed by recursively applying combOp 200 Suppose that L RDD and this RDD is split in a set of partitions, i.e., a set of lists {L1,.., Ln} The aggregate action computes a partial result in each partition and then combines/merges the results. It operates as follows 1. Aggregate the partial results in each partition, obtaining a set of partial results (of type U) P= {p1,.., pn} 2. Apply the combOp function on a pair of elements p1 and p2 in P and obtain a new element pnew 3. p1 and p2 from P and then insert the element pnew in P 4. If P contains only one value then return it as final result of the aggregate action. Otherwise, return to step 2 201 Suppose that Li is the list of elements on the i-th RDD And zeroValue is the initial zero value To compute the partial result over the elements in Li the aggregate action operates as follows 1. Set accumulator to zeroValue (accumulator=zeroValue) 2. Apply the seqOp function on accumulator and an elements ej in Li and update accumulator with the value returned by seqOp 3. ej from Li 4. If Li is empty return accumulator as (final) partial result pi of the i-th partition. Otherwise, return to step 2 202 Create an RDD of integers containing the values [1, 2, 3, 3] Compute both the sum of the values occurring in the input RDD and the number of elements of the input RDD Finally, python variable of the Driver the average computed over the values of the input RDD 203 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListAggr = [1, 2, 3, 3] inRDD = sc.parallelize(inputListAggr) # Instantiate the zero value # We use a tuple containing two values: # (sum, number of represented elements) zeroValue = (0, 0) # Compute the sum of the elements in inputRDDAggr and count them sumCount = inRDD.aggregate(zeroValue, \ lambda acc, e: (acc+e, acc+1), \ lambda p1, p2: (p1+p2, p1+p2)) 204 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListAggr = [1, 2, 3, 3] inRDD = sc.parallelize(inputListAggr) # Instantiate the zero value # We use a tuple containing Instantiate thetwo zerovalues: value # (sum, number of represented elements) zeroValue = (0, 0) # Compute the sum of the elements in inputRDDAggr and count them sumCount = inRDD.aggregate(zeroValue, \ lambda acc, e: (acc+e, acc+1), \ lambda p1, p2: (p1+p2, p1+p2)) 205 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListAggr = [1, 2, 3, 3] inRDD = sc.parallelize(inputListAggr) # Instantiate the zero value #Given We use a tuplepcontaining a partition of the input two RDD,values: this is the function that is used to combine #the elements of partition (sum, number p with the accumulator of represented elements) of partition p. acc is a tuple zeroValue object = (0, 0) (it is initially initialized to the zero value) e is an integer # Compute the sum of the elements in inputRDDAggr and count them sumCount = inRDD.aggregate(zeroValue, \ lambda acc, e: (acc+e, acc+1), \ lambda p1, p2: (p1+p2, p1+p2)) 206 # Create an RDD of integers. Load the values 1, 2, 3, 3 in this RDD inputListAggr = [1, 2, 3, 3] inRDD = sc.parallelize(inputListAggr) # Instantiate the zero value # We use a tuple containing two values: #This (sum, numberthat is the function of represented elements) is used to combine the partial results emitted by the zeroValue = (0, 0) p1 and p2 are tuple objects # Compute the sum of the elements in inputRDDAggr and count them sumCount = inRDD.aggregate(zeroValue, \ lambda acc, e: (acc+e, acc+1), \ lambda p1, p2: (p1+p2, p1+p2)) 207 # Compute the average value myAvg = sumCount/sumCount # Print the average on the standard output of the driver print('Average:', myAvg) 208 inRDD = [1, 2, 3, 3] Suppose inRDD is split in the following two partitions [1, 2] and [3, 3] 209 Partition #1 Partition #2 [1, 2] acc=(0,0) [3, 3] acc=(0,0) (1,1) (3,1) (3,2) (6,2) sumCount=(9,4) 214 All the examples reported in the following tables are applied on inputRDD that is an RDD of integers containing the following elements (i.e., values) [1, 2, 3, 3] 216 Action Purpose Example Result collect() Return a python list inputRDD.collect() [1,2,3,3] containing all the elements of the RDD on which it is applied. The objects of the RDD and objects of the returned list are objects of the same class. count() Return the number of inputRDD.count() 4 elements of the RDD countByValue() Return a Map object inputRDD. [(1, 1), containing the information countByValue() (2, 1), about the number of times (3, 2)] each element occurs in the RDD. 217 Action Purpose Example Result take(num) Return a Python list containing inputRDD.take(2) [1,2] the first num elements of the RDD. The objects of the RDD and objects of the returned list are objects of the same class. first() Return the first element of the first() 1 RDD top(num) Return a Python list containing inputRDD.top(2) [3,3] the top num elements of the RDD based on the default sort order/comparator of the objects. The objects of the RDD and objects of the returned list are objects of the same class. 218 Action Purpose Example Result takeSample(withReplace Return a (Python) List inputRDD. Nondet ment, num) containing a random sample takeSample erminis of size n of the RDD. (False, 1) tic takeSample(withReplace The objects of the RDD and ment, num, seed) objects of the returned list are objects of the same class. reduce(f) Return a single Python object inputRDD. 9 obtained by combining the reduce(lambda e1, values of the objects of the e2: e1+e2) RDD by using a user provide. The provided The passed associative and commutative The object returned by the method and the objects of the RDD belong to the same class. 219 Action Purpose Example Resul t fold(zeroValue, op) Same as reduce inputRDD. 9 but with the fold(0, lambda v1, v2: v1+v2) provided zero value. The and the passed zeroValue is 0 Aggregate(zeroValue, Similar to inputRDD.aggregate (9, 4) seqOp, combOp) reduce() but used (zeroValue, to return a lambda acc, e: (acc+e, acc+1), different type. lambda p1, p2: (p1+p2, p1+p2)) Compute a pair of integers where the first one is the sum of the values of the RDD and the second the number of elements 220

Use Quizgecko on...
Browser
Browser