Distributed Computing: Part Two 2024/2025 PDF
Document Details
Uploaded by EuphoricFir974
2024
Sara Caviglia
Tags
Summary
These lecture notes cover distributed computing concepts and applications. The document details the internals and environment of Spark and PySpark, including examples. It also discusses Apache Spark, MLlib, and other related topics in distributed computing.
Full Transcript
Distributed Computing: Part two Sara Caviglia, A.Y. 2024/2025 Lecturer: Prof. Delzanno Giorgio Spark and PySpark: internals, environment and examples Spark Stack Apache Spark is a datacentric engine, at the base of a stack, wh...
Distributed Computing: Part two Sara Caviglia, A.Y. 2024/2025 Lecturer: Prof. Delzanno Giorgio Spark and PySpark: internals, environment and examples Spark Stack Apache Spark is a datacentric engine, at the base of a stack, which is composed with other modules. There are many services and applications that can be used with Spark. Distributed Computing: Part two 1 MLlib is Spark’s machine learning module. ML algorithms: classification, regression, clustering, and collaborative filtering Featurization: feature extraction, transformation, dimensionality reduction, and selection Pipelines: tools for constructing, evaluating, and tuning ML pipelines Persistence: saving and load algorithms, models, and pipelines Utilities: linear algebra, statistics, data handling, etc. Sparks Internals How are the internals of Spark? Distributed Computing: Part two 2 Each executor has to execute the tasks of a particular worker node. Unlike Hadoop, Spark mostly uses RAM. Our focus is not speed, but the handling of large datasets. 💡 In Apache Spark, a cluster refers to a collection of machines (physical or virtual) working together to execute Spark applications. The cluster serves as the computational backbone that enables distributed data processing by dividing the workload across multiple nodes. A Spark cluster enables distributed and parallel processing of large-scale data. By leveraging resources across multiple machines, Spark clusters provide the performance, scalability, and fault tolerance needed for modern big data workloads. The architecture, which includes the driver, executors, and a cluster manager, ensures efficient execution of distributed computations. Each cluster needs a cluster manager: one of the most used is YARN, which is part of the Hadoop ecosystem. Spark is based on the idea of lazy evaluation: you post-pone the computation as far as possible, until it’s really needed. It’s like MapReduce: we have some pre- Distributed Computing: Part two 3 processing, in order to isolate different parts. Another cardinal point of Spark is fault tolerance: Spark data structures track data lineage/provenance information to rebuilt lost data automatically on failure. PySpark PySpark is the Python API for Apache Spark to perform real-time, large-scale data processing in a distributed environment using Python. It also provides a PySpark shell for interactively analyzing your data. Distributed Computing: Part two 4 Spark Architecture SparkSession is the entry point to Spark. SparkSession internally creates SparkConfig and SparkContext with the configuration provided with SparkSession. import findspark findspark.init() findspark.find() import pyspark from pyspark.sql import SparkSession spark = SparkSession \.builder \.appName("Our First Spark Example") \.getOrCreate() sc = spark.sparkContext This code has the following output. Distributed Computing: Part two 5 We have an example of configuration. master(String master) : sets the Spark master URL to connect to local[*] : run locally and use as many threads as possible local[K] : run locally with K cores yarn : run on a cluster with YARN as cluster manager Among the resources we have also the processes. 💡 Remember that multiprocessing is not multithreading! Apache Hadoop YARN Apache Hadoop YARN is a cluster manager that splits up the functionalities of resource management and job scheduling/monitoring into separate daemons. A container is the physical instance of a process executed by YARN on a worker node within the cluster. The context of the container defines the executable to run, the environment, arguments to the executable, and other information set by the client. Distributed Computing: Part two 6 What are the other YARN components? NodeManager is the per-machine framework agent who is responsible for container, monitoring the resource usage (CPU, memory, disk, network) and reporting the same to the ResourceManager/Scheduler. The ResourceManager is the authority that arbitrates resources among all the applications in the system. The Scheduler is responsible for allocating resources to the various running applications using the abstract notion of a resource Container which incorporates elements such as memory, CPU, disk, network, etc. The Scheduler applies a policy for partitioning the cluster resources among the various queues, applications, etc. The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks. The ApplicationsManager is responsible for accepting job- submissions, negotiating the first container for executing the application specific and provides the service for restarting the ApplicationMaster container on failure. Where to use PySpark? Distributed Computing: Part two 7 PySpark can be used in many different situations. Local machine PySpark shell Python driver program Google Colab Jupyter Notebook findspark + sparkSession configuration of.zshrc or.bshrc LSC cluster → we have a cluster, a number of machines and HDFS How does Spark work? Distributed Computing: Part two 8 Scheduling Application model for scheduling: Application → Driver code that represents the DAG. Job → Subset of application triggered for execution by an “action” in the DAG. Distributed Computing: Part two 9 Stage → Job sub-divided into stages that have dependencies with each other. Task → Unit of work in a stage that is scheduled on a worker. Worker nodes and executors Worker nodes are machines that run executors. They can host one or multiple workers. We have one JVM (meaning one UNIX process) per worker. Each worker can spawn one or more executors. Executors run tasks, used by one application, for their whole lifetime. They run in child JVM (meaning one UNIX process). They execute one or more tasks using threads in a thread pool. Spark’s scheduler property Distributed Computing: Part two 10 1. FIFO → e.g. in test phase By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into stages (e.g. map and reduce phases), and the first job gets priority on alla available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs a the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly. 2. FAIR → e.g. in production phase The fair scheduler also supports grouping jobs into pools and setting different scheduling options (e.g. weight) for each pool. This can be useful to create a high-priority pool for more important jobs, for example, or to group the jobs of each user together and give users equal shares regardless of how many concurrent jobs they have instead of giving jobs equal shares. This approach is modeled after the Hadoop Fair Scheduler. Without any intervention, newly submitted jobs go to a default pool, but jobs’ pools can be set by adding the spark.scheduler.pool “local property” to the SparkContext in the thread that’s submitting them. Stages A stage is a collection of tasks that share the same shuffle dependencies, meaning that they must exchange data with one another during execution. When a Spark job is submitted, it is broken down into stages based on the operations defined in the code. Each stage is composed of one or more tasks that can be executed in parallel across multiple nodes in a cluster. Stages are executed sequentially, with the output of one stage becoming the input of the next stage. Distributed Computing: Part two 11 A typical Spark job consists of multiple stages. Each stage is a sequence of transformations and actions on the input data. When a Spark job is submitted, Spark evaluates the execution plan and divides the job into multiple stages based on the dependencies between the transformations. Spark executes each stage in parallel, where each stage can have multiple tasks running on different nodes in the cluster. Narrow stages are stages where the data does not need to be shuffles. Each task in a narrow stage operates on a subset of the partitions of its parent RDD. Narrow stages are executed in parallel and can be pipelined. Wide stages are stages where the data needs to be shuffled across the nodes in the cluster. This is because each task in a wide stage operates on all the partitions of its parent RDD. Wide stages involve a data shuffle and are typically more expensive than narrow stages. From narrow stages and wide stages we can go to narrow dependencies and wide dependencies. What are the optimizations based on the dependencies? We have the benefits of lazy evaluation. The DAG scheduler optimizes stages and tasks before submitting them to the task scheduler. We have the pipelining of narrow dependencies within a stage. Distributed Computing: Part two 12 We have the join of plan selection based on partitioning. We have cache reuse. Let’s see two examples. # Create RDD val data = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6)) # RDD filter val filtered = data.filter(_ % 2 == 0) # map() val mapped = filtered.map(_ * 2) # Collect val result = mapped.collect() val sc = spark.sparkContext val rdd1 = sc.parallelize(Seq(("a", 55), ("b", 56), ("c", 57))) val rdd2 = sc.parallelize(Seq(("a", 60), ("b", 65), ("c", 61))) val joinrdd = rdd1.cartesian(rdd2) joinrdd.saveAsTextFile("/path/to/output") Distributed Computing: Part two 13 Streaming Spark and Structured Streaming Spark Streaming Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs. We have a flow of data, and we want to perform real-time analytics. We use DStream with sockets. Distributed Computing: Part two 14 💡 Recap of how a TCP producer works. import socket from time import sleep host = 'localhost' port = 9999 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((host, port)) s.listen(1) while True: print('\nListening for a client at',host , port) conn, addr = s.accept() print('\nConnected by', addr) try: print('\nReading file...\n') with open('big1.txt') as f: for line in f: out = line.encode('utf-8') print('SENT: ',line) conn.send(out) sleep(2) print('End Of Stream.') except socket.error: print ('Error Occured.\n\nClient disconnected.\n') conn.close() from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with tho working thread and ba sc = SparkContext("local", "NetworkWordCount") ssc = StreamingContext(sc, 1) Distributed Computing: Part two 15 # Create a DStream that will conncet to hostname:port, like loca lines = ssc.socketTextStream("localhost", 9999) Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost ) and port (e.g. 9999 ). This lines DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space into words. # Split each line into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DSt wordCounts.pprint() flatMap is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the words DStream. Next, we want to count these words. Distributed Computing: Part two 16 Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate # Reduce last 30 seconds of data, every 10 seconds windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + lambda x, y: x - y, 30, 10) Window length is the duration of the window. Sliding interval is the interval at which the window operation is performed. These two parameters must be multiples of the batch interval of the source DStream. Distributed Computing: Part two 17 The function transform returns a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. The function updateStateByKey returns a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) def updateFunc(new_values: Iterable[int], last_sum: Optional[int return sum(new_values) + (last_sum or 0) lines = ssc.socketTestStream("localhost", 9999) running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD = initialStateRDD) Distributed Computing: Part two 18 Structured Streaming Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. The Spark SQL engine takes care of running a program incrementally and continuously and updating the final result as streaming data continues to arrive. It is possible to use the Dataframes instead of RDDs. What is its programming model? Distributed Computing: Part two 19 Here we have an example for incremental word counting. # Split the lines into words, retaining timestamps # split() splits each line into an array, # and explode() turns the array into multiple rows words = lines.select(explode(split(lines.value, ' ')).alias('wor # Group the data by window and word and compute the count of eac windowedCounts = words.groupBy( \ window(words.timestamp, windowDuration, slideDuration), \ words.word).count().orderBy('window') We can also see the window and slide duration. Distributed Computing: Part two 20 MLib PySpark MLib pipelines Main concepts in pipelines MLib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. The pipeline concept is mostly inspired by the scikit-learn project. DataFrame : This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions. Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions. : An Estimator is an algorithm which can be fit on a DataFrame to Estimator produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model. : A Pipeline chains multiple Pipeline Transformers and Estimators together to specify an ML workflow. : All Transformers and Parameter Estimators now share a common API for specifying parameters. Distributed Computing: Part two 21 Transformers A Transformer is an abstraction that includes feature transformers and learned models. Technically, a Transformer implements a method transforms() , which converts one DataFrame into another, generally by appending one or more columns. For example: A feature transformer might take a DataFrame , read a column (e.g. text), map it into a new columns (e.g. feature vectors), and output a new DataFrame with the mapped column appended. A learning model might take a DataFrame , read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column. Each instance of a Transformer or Estimator as a unique ID, which is useful in specifying parameters. Estimators An Estimator abstracts the concept of a learning algorithms or an algorithm that fits or trains on data. Technically, an Estimator implements a method fit() , which accepts a DataFrame and produces a Model , which is a Transformer. For example, a learning algorithm such as LogisticRegression is an Estimato r, and calling fit() trains a LogisticRegressionModel which is a Model and hence a Transformer. Let’s see an example with Pipeline. Tokenizerand HashingTF are Transformers. HashingTF is not based on a cryptographic function, so it can be inverted. LogisticRegression is an Estimator. A Pipeline is an Estimator: in this case it’s just the organisation of the operations (nothing is executed and no prediction is computed). Thus, after a Pipeline ’s fit() method runs, it produces a PipelineModel , which is a Transformer. This PipelineModel is used at test time. What happens with the test? Distributed Computing: Part two 22 # STAGES AND PIPELINE from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer # Prepare training documents from a list of (id, text, label) tu training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of three stages: # tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputC Distributed Computing: Part two 23 lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) # Fit the pipeline to training documents. model = pipeline.fit(training) # model is now a transformer # PIPELINE FIT AND TEST # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop") ], ["id", "text"]) # Make predictions on test documents and print columns of intere prediction = model.transform(test) # StopWordsRemover from pyspark.ml.feature import StopWordsRemover sentenceData = spark.createDataFrame([ (0, ["I", "saw", "the", "red", "balloon"]), (1, ["Mary", "had", "a", "little", "lamb"]) ], ["id", "raw"]) remover = StopWordsRemover(inputCol="raw", outputCol="filtered") remover.transform(sentenceData).show(truncate=False) # NGram Transformer from pyspark.ml.feature import NGram wordDataFrame = spark.createDataFrame([ (0, ["Hi", "I", "heard", "about", "Spark"]), (1, ["I", "wish", "Java", "could", "use", "case", "classes"] (2, ["Logistic", "regression", "models", "are", "neat"]) ], ["id", "words"]) ngram = NGram(n=2, inputCol="words", outputCol="ngrams") Distributed Computing: Part two 24 ngramDataFrame = ngram.transform(wordDataFrame) ngramDataFrame.select("ngrams").show(truncate=False) # PARAMETER TUNING AND CROSS VALIDATION paramGrid = ParamGridBuilder() \.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \.addGrid(lr.regParam, [0.1, 0.01]) \.build() crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2) # use 3+ folds in practice # Run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(training) # Prepare test documents, which are unlabeled. test = spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "mapreduce spark"), (7, "apache hadoop") ], ["id", "text"]) # Make predictions on test documents. cvModel uses the best mode prediction = cvModel.transform(test) selected = prediction.select("id", "text", "probability", "predi for row in selected.collect(): print(row) Distributed Computing: Part two 25 # TRAIN-VALIDATION SPLIT # We use a ParamGridBuilder to construct a grid of parameters to # TrainValidationSplit will try all combinations of values and d # using the evaluator. paramGrid = ParamGridBuilder()\.addGrid(lr.regParam, [0.1, 0.01]) \.addGrid(lr.fitIntercept, [False, True])\.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\.build() # In this case the estimator is simply the linear regression. # A TrainValidationSplit requires an Estimator, a set of Estimat # and an Evaluator. tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(), # 80% of the data will be used for training, 20% for validat trainRatio=0.8) # Run TrainValidationSplit, and choose the best set of parameter model = tvs.fit(train) # Make predictions on test data. model is the model with combina # that performed best. model.transform(test)\ Distributed Computing: Part two 26.select("features", "label", "prediction")\.show() DAG Pipelines A Pipeline ’s stages are specified as an ordered array. It is possible to create non- linear Pipelines as long as the data flow graph forms a Directed Acyclic Graph (DAG). This graph is currently specified implicitly based on the input and output column names of each stage (generally specified as parameters). If the Pipeline forms a DAG, then the stages must be specified in topological order. A Pipeline ’s stages should be unique instances. E.g., the same instance myHashingTF should not be inserted into the Pipeline twice since Pipeline stages must have unique IDs. However, different instances of myHashingTF1 and myHashingTF2 (both of type HashingTF ) can be put into the same Pipeline since different instances will be created with different IDs. Distributed Computing: Part two 27