Summary

These lecture notes cover MapReduce, a programming model for processing and generating large datasets. The notes introduce the concept, discussing the need for large-scale data processing in cloud environments, along with the MapReduce principle and supporting frameworks

Full Transcript

LINFO2145 — Cloud Computing Lesson 10: Map/Reduce Pr. Etienne Rivière [email protected] 2 Lecture objectives Introduce the need for large-scale data processing in Cloud environments...

LINFO2145 — Cloud Computing Lesson 10: Map/Reduce Pr. Etienne Rivière [email protected] 2 Lecture objectives Introduce the need for large-scale data processing in Cloud environments Present the Map/Reduce principle and supporting frameworks Detail some representative applications Cloud Computing — E. Rivière 3 Introduction Cloud computing allows larger-scale applications Large-scale applications attract more users Users generate more data Application generate more data (such as logs) How can we leverage this data to make the application better? “Big Data” phenomenon Cloud Computing — E. Rivière 4 The 4 “V” of Big Data image © IBM Cloud Computing — E. Rivière 5 The data deluge A ZetaByte is 1,000,000,000,000,000,000,000 Bytes Figure: estimation of total data volume generated worldwide (Statista) Increasingly more human activities are digitalized Stronger trend with the COVID-19 crisis Volume of generated data is growing exponentially Many activities and businesses are now data-driven Cloud Computing — E. Rivière 6 Data-supported services Applications using large volumes of public data Web search and indexing (initially) Large Language Models and AI in general Applications using user-generated data Social networks Recommendation engines (Net ix, Spotify, etc.) Taxi hailing: Predictive models (where will clients be? Where will they want to go?) And using both Web search and indexing (now) Cambridge Analytica … Cloud Computing — E. Rivière fl 7 Dealing with the two rst “V”s Need speci c tools and programming models (Very) large amounts of data Complex environment (replication, distribution) Faults and slow machines Today: how to handle Volume Map/Reduce framework for large static data Next week: how to handle Velocity Stream processing frameworks for dynamic data We will not cover Variety (data curation) and Veracity (data authenticity) in this course We will also not cover AI model training (speci c courses) Cloud Computing — E. Rivière fi fi fi 8 Motivating example: Logging Clients Application front-end (FE) components generate logs Client information Errors FE FE FE FE Page generation time Application Items accessed/purchased PUT log entry Log entries stored as JSON documents in a distributed CouchDB database logs logs logs CouchDB NoSQL database Cloud Computing — E. Rivière 9 Processing logs We would like to know the average generation time for each type of page Types: home, product, cart, checkout This information is available by processing the logs { “log_type": "page_view", "page_type": "product", "generation_time": 42.6 90 Avg. gen. time (ms) } 60 { "log_type": "page_view", "page_type": "product", 30 "generation_time": 28.2 } 0 { home product cart checkout "log_type": "page_view", "page_type": "product", Page type "generation_time": 27 } Cloud Computing — E. Rivière 10 Centralized processing? Should we collect all log documents and process them in a dedicated single process? ❌ Amount of logs can be very large May not t in memory at all! P logs ❌ Not all logs entries are useful for logs our operation: wasted bandwidth logs ❌ Single process: very slow GET /logs/_all_docs Need parallel, in-place processing logs logs logs CouchDB NoSQL database Cloud Computing — E. Rivière fi 11 Processing logs in parallel 90 60 30 0 home product cart checkout home: home: home: 90 90 90 60 60 60 30 30 30 0 0 0 home product cart checkout home product cart checkout home product cart checkout logs logs logs Cloud Computing — E. Rivière 12 Handling Volume 💡 Split the dataset in multiple partitions, process each partition independently 🎯 Later, merge outputs for nal result Dif cult to do “manually” 🤔 Deploy worker processes close to the data 🤔 Coordinate all the workers 🤔 Handle faults at the workers 🤔 Collect all outputs and process them Start again … Cloud Computing — E. Rivière fi fi 13 Map/Reduce Big Data processing often follows a similar pattern: PARTITION - Iterate in parallel over a large number of records MAP - Extract information of interest from the records SHUFFLE - Regroup this information in sets, one for each category REDUCE - Aggregate each of the sets to obtain the nal result(s) ✓ Map/Reduce is a programming model that allows expressing such queries and running them in parallel on many machines ‣ Key idea: provide an abstraction at the point of the two operations MAP and REDUCE, make others implicit Cloud Computing — E. Rivière fi 14 Programming model Programmer speci es two functions map(record) get records from a partition of the source data can generate pairs with the emit call reduce(key, {values}) receives all values for the same key generate aggregate results, also as pairs PARTITION and SHUFFLE are handled transparently Cloud Computing — E. Rivière fi 15 MAP phase Input: set of log entries Map function { map(log_entry) { “log_type": "page_view", if (log_entry.log_type == "page_view") { "page_type": "product", emit(log_entry.page_type, "generation_time": 42.6 {count: 1, avg_time: log_entry.generation_time}) } } } { “log_type": “purchase”, “item_id”: 6476, Output: set of key, value pairs “user_id”: 726 logs } { "key”:”product”, "value”:{“count":1, “avg_time”:42.6}} { “log_type": "page_view", M { "key”:”home”, "page_type": “home”, "value”:{“count":1, "avg_time”:55.3}} "generation_time": 55.3 { "key”:”product”, } "value”:{“count":1, “avg_time”:67.1}} { “log_type": “access”, Output: set of key, value pairs “item_id”: 8921, (product) “user_id”: 251 } (cart) { (home) “log_type": "page_view", (checkout) "page_type": “product”, "generation_time": 67.1 (product) } Output: set of key, value pairs logs M (cart) (checkout) (product) logs M (product) (home) Cloud Computing — E. Rivière 16 SHUFFLE phase Map outputs: Shuffle output: sets of key, value pairs sets of values for each key product: (product) logs M (home) (product) (product) (cart) logs M (home) home: (checkout) (product) (cart) cart: (checkout) logs M (product) (product) checkout: (home) Remember that this phase is transparent for the programmer! Cloud Computing — E. Rivière 17 REDUCE phase Reduce function reduce(key, values) { var count = values.reduce((a,b) => a.count + b.count) var avg_time = (values.reduce((a,b) => Shuffle output: a.avg_time * a.count + sets of values b.avg_time * b.count) for each key / count) emit (key, {count: count, avg_time: avg_time}) product: } { {"count":1, "avg_time":42.6}, product: {"count":1, "avg_time":55.3}, { {"count":1, "avg_time":67.1}, {"count":1, "avg_time":48.9}, R } "count”:6, "avg_time”:54.4 {"count":1, "avg_time":51.8}, {"count":1, "avg_time":61.0} } home: home: { R } "count”:3, "avg_time”:37.8 cart: cart: { R } "count”:2, "avg_time”:83.2 checkout: checkout: { R } "count”:6, "avg_time”:73.0 This view is our expected result Cloud Computing — E. Rivière 18 Bandwidth inef ciency Map outputs: Shuffle output: Reduce output: sets of key, value pairs sets of values for each key final results product: (product) M (home) product: (product) { R } {"count”:6, "avg_time”:54.4} (product) (cart) M (home) home: home: (checkout) { (product) R } {"count”:3, "avg_time”:37.8} (cart) cart: cart: (checkout) { M (product) R } {"count”:2, "avg_time”:83.2} (product) checkout: checkout: (home) { R } {"count”:6, "avg_time”:73.0} Observation: several pairs for the same key are generated by each mapper, and each is shuf ed and sent independently to the corresponding reducer Cloud Computing — E. Rivière fi fl 19 Local reduction 💡 Could we locally aggregate all pairs for the same key at the Mapper process, before sending them to the SHUFFLE phase? The output of a reduce() function may be used as its input list ➡ Then local MAP output can be locally reduced before the SHUFFLE phase This property holds for many aggregations: max, min, average,… Sometimes, we need to know if we are reducing local MAP results or the result of the SHUFFLE phase Optional “rereduce” parameter to the reduce() call tells us if this is a local reduction (false) or the nal (true) one: reduce(key, {values}, rereduce) Reduce function reduce(key, values) { var count = values.reduce((a,b) => a.count + b.count) var avg_time = (values.reduce((a,b) => a.avg_time * a.count + b.avg_time * b.count) / count) emit (key, {count: count, avg_time: avg_time}) } Cloud Computing — E. Rivière fi 20 Local reduction applied Map outputs: Shuffle output: Reduce output: sets of key, value pairs sets of values for each key final results (rereduce = true) (product) product: M (home) R (product) product: (product) { R } {"count”:6, "avg_time”:54.4} (product) (cart) M (home) R (product) home: home: (checkout) { (product) R } {"count”:3, "avg_time”:37.8} (cart) cart: cart: (checkout) { M (product) R (product) R } {"count”:2, "avg_time”:83.2} (product) checkout: checkout: (home) Local reduction { (rereduce = false) R {"count”:6, "avg_time”:73.0} } Cloud Computing — E. Rivière 21 Link with functional programming Higher-order functions take a function as argument Map: function applied to every element in a list Result is a new list Each operation is independent Fold: accumulator set to initial value, function applied to list element and the accumulator, result stored in accumulator Repeated for every item in the list Result is the nal value in the accumulator Map/Reduce similar in principle but multiple output lists for Map and parallel Fold operations f f f f f f f f f f f f initial value final value Cloud Computing — E. Rivière fi 22 Examples of applications Cloud Computing — E. Rivière 23 wordcount Count how many times each word appears in a text corpus map(text) { foreach (word: text.split(‘ ‘)) { emit(word, {count: 1}) } } reduce(key, values) { var count = values.reduce((a,b) => a.count + b.count) emit (key, {count: count}) } Cloud Computing — E. Rivière 24 wordcount local reducer Local reducer helps save bandwidth by pre- reducing the results of a map locally and send fewer intermediate pairs Here, can use the same code as nal reducer Aggregate counts for different words under a single pair Not always the case! Cloud Computing — E. Rivière fi 25 Distributed grep Grep reads a le line by line, and if a line matches a pattern (e.g., regular expression), it outputs the line Map function read a le or set of les emit line if it matches pattern p under xed key p Reduce function identity (use intermediate results as nal results) Do you see an issue with this pattern? And a possible x? Cloud Computing — E. Rivière fi fi fi fi fi fi 26 Top-k page frequency Input: log of web page requests Output: the top-k accessed webpages Map function Parse log, output pairs Similar to word count but goes to a single key Reduce function (for single key) Aggregate pairs for individual URLs Sort by decreasing frequency Output top-k pairs Cloud Computing — E. Rivière 27 Top-k page frequency Going to a single key is vastly inef cient A single worker will receive all individual pairs Only to aggregate them and get the top-k URLs Use local reducer, as for the wordcount Here we cannot use the same code as in the wordcount example The union of the local top-K is not the global top-K Need to select the local top-(K+D) where D depends on the workload Sometimes OK to do local trimming for an approximate solution Cloud Computing — E. Rivière fi 28 Why global top-k cannot be used as local reducer Example with k=3 On Mapper 1 On Mapper 2 On Reducer Item Frequency Item Frequency Item Frequency A 12 A 9 A 21 B 10 B 8 B 18 C 8 E 5 C 8 D 6 D 4 should be D with a frequency of 10! Cloud Computing — E. Rivière 29 Reverse Web-link graph Get all the links pointing to some page This is the rst basis for the PageRank algorithm (the original web ranking algorithm of Google) Map function output a pair for each link to target URL in a page named source Reduce function Concatenate the list of all source URLs associated with a given target URL and emits the pair: Cloud Computing — E. Rivière fi 30 Inverted index Get all documents containing some particular keyword Used by the search mechanisms of Google,Yahoo!, etc. Second input for PageRank Map function Parse each document and emit a set of pairs Reduce function Take all pairs for a given word Sort the documents IDs Emit a nal pair Cloud Computing — E. Rivière fi 31 k-Means computation Typical data mining / database problem Group items into k clusters clusters must minimize distance between contained points Age Expenses Cloud Computing — E. Rivière Approach:"kbMeans" 32 Approach:"kbMeans" k-Means principle !! Let"m Let"m1,"m 1,"m 2,"…,"m 2,"…,"m k k be"representative"points" be"representative"points" for"each"of"our"k"clusters Goal: obtain m1, m2, …, mk as representative points for"each"of"our"k"clusters of the k clusters Specifically:"the"centroid of"the"cluster Specifically:"the"centroid ! ! of"the"cluster ! These will be,"m Initialize"m ! Initialize"m the ,"…,"m 11,"m centroids ofto"random"values"in" 22,"…,"m the clusters k k to"random"values"in" the"data the"data Init. m1, m2, …, mk to random values !! For"t"="1,"2,"…: For"t"="1,"2,"…: Iterate Assign"each"point"to"the"closest"centroid Assign"each"point"to"the"closest"centroid ! Assign ! each point to the closest centroid i {{ SiS(t )(t = j j (t ) = jx : xj − m i i (t ) ) x : x − m ( t )≤ x − m ( t ), i* = 1,..., k ≤ xj − m i* , i* = 1,..., k j i* }} ! m ! i i becomes mm i the new centroid for its points becomes"new"centroid"for"its"points becomes"new"centroid"for"its"points ( t +1) 11 mm == ( t ) ∑ x xj ( t +1) SSi ( t )x jx∈∑ i i j S i( t )( t ) i ∈S j i 45 ©"2015"M."Canini ©"2015"M."Canini Université"catholique"de"Louvain Université"catholique"de"Louvain 45 Cloud Computing — E. Rivière 33 Simple example (20,21) (30,21) (18,20) Age (11,16) (15,12) (10,10) Expenses Cloud Computing — E. Rivière 34 Simple example (20,21) (30,21) (18,20) Age (11,16) Randomly chosen initial centers (15,12) (10,10) Expenses Cloud Computing — E. Rivière 35 Simple example (20,21) (30,21) (18,20) Age (19.75,19.5) (11,16) (12.5,11) (15,12) (10,10) Expenses Cloud Computing — E. Rivière 36 Simple example (20,21) (30,21) (18,20) Age (22.67,20.67) Stable! (11,16) (12,12.67) (15,12) (10,10) Expenses Cloud Computing — E. Rivière 37 k-Means in Map/Reduce Initialize: choose centroids randomly MAP Classify phase Map each point to the closest centroid { Si(t ) = x j : x j - mi(t ) £ x j - mi(*t ) , i* = 1,..., k } REDUCE Recenter phase mi becomes new centroid for its points 1 m ( t +1) i = (t ) Si åx j x j ÎS i( t ) Repeat until no change Centroids have converged Cloud Computing — E. Rivière 38 Classi cation step as Map init: Read in global var centroids from le Initially k random points map(centroid, point): Compute nearest centroid based on centroids: emit(nearest centroid, point) How do we know the centroids? Cloud Computing — E. Rivière fi fi 39 Recenter Step as Reduce Initialize global var centroids = [] reduce(centroid, points[]) Recompute centroid from points in it Foreach point in points: emit(centroid, point) Add centroid to global centroids cleanup (after all calls to reduce are made): Save global centroids to le Check if change since last iteration Does not completely “ t” Map/Reduce model that assumes no global shared state But widely used in practice Cloud Computing — E. Rivière fi fi 40 Origin and implementation Cloud Computing — E. Rivière 41 Origin Map/Reduce originally proposed by Google in 2008 Necessary due to their unprecedented scale One of the most cited papers in computer science Computation of the PageRank for webpages Also, extraction of keywords, sanitation, etc. Many other uses: log parsing, network monitoring, etc. Proprietary implementation Huge and immediate success Simple programming model Distributed computation complexity left to framework Usable by non-CS majors MapReduce: Simplified Data Processing on Large Clusters Jeffrey Dean and Sanjay Ghemawat [email protected], [email protected] Google, Inc. Abstract given day, etc. Most such computations are conceptu- ally straightforward. However, the input data is usually MapReduce is a programming model and an associ- large and the computations have to be distributed across ated implementation for processing and generating large hundreds or thousands of machines in order to finish in data sets. Users specify a map function that processes a a reasonable amount of time. The issues of how to par- key/value pair to generate a set of intermediate key/value allelize the computation, distribute the data, and handle pairs, and a reduce function that merges all intermediate failures conspire to obscure the original simple compu- values associated with the same intermediate key. Many tation with large amounts of complex code to deal with real world tasks are expressible in this model, as shown these issues. in the paper. As a reaction to this complexity, we designed a new Cloud Computing — E. Rivière Programs written in this functional style are automati- cally parallelized and executed on a large cluster of com- modity machines. The run-time system takes care of the abstraction that allows us to express the simple computa- tions we were trying to perform but hides the messy de- tails of parallelization, fault-tolerance, data distribution 42 Original Map/Reduce The job is submitted to a master process The master orchestrates its execution Each node supports one or more workers Each worker can handle a map or reduce job when instructed by the master Map jobs get the data from le system Google File System: optimized for storing large append-only les Also possible from NoSQL database: Google BigTable Communication is based on key/values pairs Output written to le system Cloud Computing — E. Rivière fi fi fi 43 Execution overview User 0. The MapReduce library Program splits the input les into 1. The library starts many (1) fork M pieces (typically 16MB (1) fork (1) fork processes on the cluster to 64MB) Master 1 bis. One of the process (2) (2) assign is special -- it is the assign reduce map master that orchestrates worker the execution split 0 (6) write output split 1 worker file 0 (5) remote read split 2 (3) read (4) local write worker output split 3 worker file 1 split 4 worker Input Map Intermediate files Reduce Output files phase (on local disks) phase files (from the original MapReduce paper) Figure Cloud1: Execution—overview Computing E. Rivière fi 44 Execution overview User 2. The Master assigns the Program map job and part of the (1) fork input to a worker (1) fork (1) fork 4. Periodically the map 3. The map worker reads Master worker ushes (2) from the FS/DB (2) assign intermediate pairs to disk assign reduce map worker split 0 (6) write output split 1 worker file 0 (5) remote read split 2 (3) read (4) local write worker output split 3 worker file 1 split 4 worker Input Map Intermediate files Reduce Output files phase (on local disks) phase files (from the original MapReduce paper) Figure Cloud1: Execution—overview Computing E. Rivière fl 45 Execution overview 5. The Map workers inform User the Master of the location Program of fresh data (1) fork The Master informs the (1) fork (1) fork worker where to get some 5b. The Reduce workers data for the part of the key Master collect all key, value pairs space they are in charge of (2) where the keys are in assign (locally) (2) assign reduce their responsibility range map worker split 0 (6) write output split 1 worker file 0 (5) remote read split 2 (3) read (4) local write worker output split 3 worker file 1 split 4 worker Input Map Intermediate files Reduce Output files phase (on local disks) phase files (from the original MapReduce paper) Figure Cloud1: Execution—overview Computing E. Rivière 46 Execution overview User Program 6. Once a reduce worker has ALL the key value (1) fork (1) fork (1) fork pairs (as instructed by the master) it processes the Master values and sends the (2) result to the global le (2) assign assign reduce system map worker split 0 (6) write output split 1 worker file 0 (5) remote read split 2 (3) read (4) local write worker output split 3 worker file 1 split 4 worker Input Map Intermediate files Reduce Output files phase (on local disks) phase files (from the original MapReduce paper) Figure Cloud1: Execution—overview Computing E. Rivière fi 47 Execution overview User Program (1) fork (1) fork (1) fork NOTE: these can be the input split values for Master (2) another MapReduce job! (2) assign assign reduce map worker split 0 (6) write output split 1 worker file 0 (5) remote read split 2 (3) read (4) local write worker output split 3 worker file 1 split 4 worker Input Map Intermediate files Reduce Output files phase (on local disks) phase files (from the original MapReduce paper) Figure Cloud1: Execution—overview Computing E. Rivière 48 Implementation challenges Failing workers ➡ Must detect and re-assign to another worker ✓ Partitioning helps: partially processed data simply discarded Slow workers ➡ Alive but slow worker can slow down entire job execution Monitor work, assign fast workers work done by slow ones Redundant computations, keep only rst nisher Failing master Original Map/Reduce: snapshot and retry Now: use a coordination kernel! Cloud Computing — E. Rivière fi fi 49 Original performance measurements Con guration 1.800 machines, dual 2 GHz Xeon, 4 GB, 2x160GB disks, 1 GBps ethernet 100 to 200 GBps aggregate bandwidth at the root of the network Cloud Computing — E. Rivière fi (from the original MapReduce paper) 50 Distributed grep Grep scans through 1010 100-byte records for a three- char pattern (occurs in 92.337 of them) setting up and getting info from GFS 30000 Input (MB/s) percase"); 20000 ntents): nts: 10000 (); 0 "); 20 40 60 80 100 Seconds l worker machines aster (piggybacked Figure 2: Data transfer rate over time regates the counter Cloud Computing — E. Rivière 51 Sort (10 10 100-byte records, 50 lines of C code) 20000 20000 20000 Done Done Done Input (MB/s) Input (MB/s) 15000 15000 Input (MB/s) 15000 10000 10000 10000 5000 5000 5000 0 0 0 500 1000 500 1000 500 1000 20000 20000 20000 Shuffle (MB/s) Shuffle (MB/s) Shuffle (MB/s) 15000 15000 15000 10000 10000 10000 5000 5000 5000 0 0 0 500 1000 500 1000 500 1000 20000 20000 20000 Output (MB/s) Output (MB/s) Output (MB/s) 15000 15000 15000 10000 10000 10000 5000 5000 5000 0 0 0 500 1000 500 1000 500 1000 Seconds Seconds Seconds (a) Normal execution (b) No backup tasks (c) 200 tasks killed Figure 3: Data transfer rates over time for different executions of the sort program (from the original MapReduce paper) original text line as the intermediate key/value Cloudpair. We —the Computing first batch of approximately 1700 reduce tasks (the E. Rivière 52 Map/Reduce frameworks Cloud Computing — E. Rivière 53 Map/Reduce implementations Original Google Map/Reduce proprietary Hadoop: open source implementation of MapReduce and associated tools File systems, work ow management Do you recognize someone in the gure below? Cloud Computing — E. Rivière fl fi 54 Map/Reduce in NoSQL databases Hadoop is a heavy machinery Useful for processing vast amounts of unstructured data But Cloud application data is stored in NoSQL databases Map/Reduce calls supported by most NoSQL databases Computations performed directly on top of the data Example of CouchDB in tutorial tomorrow Project part 2: build a recommendation engine using CouchDB Map/Reduce and integrate it in your shopping cart application Cloud Computing — E. Rivière 55 Map/Reduce as a service Amazon Elastic Map Reduce: pre-con gured versions of BigData frameworks including Hadoop, etc. Can feed in data from S3, Dynamo DB, etc. Similar offers at other providers Cloud Computing — E. Rivière fi 56 Conclusions Big Data processing necessary for Cloud scale enables new usages and applications Cloud-based web applications collect vast amounts of information about their clients, servers, infrastructure Process it in the background to derive information useful for business Example in project part 2 with recommendation service Cloud Computing — E. Rivière

Use Quizgecko on...
Browser
Browser