Big Data Systems - Map Reduce I PDF 2024

Summary

These lecture notes provide an overview of big data systems, focusing on the MapReduce paradigm and its architecture. The content includes various aspects such as design, algorithms, and programming models relevant to the processing of large datasets. The document also describes sorting techniques and fault tolerance strategies within the MapReduce system.

Full Transcript

Martin Boissier Big Data Systems Data Engineering Systems Map Reduce I Hasso Plattner Institute Announcements § One week left for the first exercise § Check out discussions in Moodle 2 Timeline I Date...

Martin Boissier Big Data Systems Data Engineering Systems Map Reduce I Hasso Plattner Institute Announcements § One week left for the first exercise § Check out discussions in Moodle 2 Timeline I Date Tuesday Wednesday 15.10. /16.10 Intro / Organizational Use Case - Search Engines 22.10. / 23.10. Performance Management Intro to GitHub Classroom 29.10. / 30.10. Map Reduce I Map Reduce II 5.11. / 6.11. Map Reduce III Exercise 12.11. / 13.11. Data Centers Cloud 19.11 / 20.11. File Systems Exercise 26.11. / 27.11. Key Value Stores I Key Value Stores II 3.12 / 4.12. Key Value Stores III Exercise 10.12. / 11.12. Stream Processing I Stream Processing II 17.12. / 18.12. ML Systems I Exercise Christmas Break 3 This Lecture 1. Map Reduce Paradigm 2. Sorting large amounts of data 3. MR Architecture Sources § Dean, Ghemawat: MapReduce: Simplified Data Processing on Large Clusters. OSDI’04 § Paper & Slides: http://research.google.com/archive/mapreduce.html § Mone: Beyond Hadoop. CACM, Vol. 56 No. 1 § Online: http://cacm.acm.org/magazines/2013/1/158759-beyond-hadoop/fulltext 4 Where are we? Application / Query § First intro to big data processing Language / Analytics / Application Visualization Data Processing § THE ancestor of all big data processing frameworks Data Management Big Data Systems File System § Can conceptually be used Virtualization / Container independently of the complete stack OS / Scheduling Infrastructure Hardware 5 Basic Web Search Interaction § Big data Document § Billions of web pages Store User § Non-interactive workloads Interaction § Mins/hours runtime ok Index § Distributed processing § 10s – 1000s of servers 6 Large Scale Data Analysis § Google's problem (in 2004) § 20+ billion web pages x 20KB = 400 TB § One computer can read 30-35 MB/sec from disk § 4 months to read the web § 1,000 hard drives just to store the web § Same problem with 1,000 machines < 3 hours § But distributed and parallel programming is hard! § Also, clusters break frequently at scale § Remember, Google cluster first year: § 1000 node failures, 1000s hard drive failures 7 MapReduce § Programming model § Large scale, distributed data processing § Inspired by map and reduce functions in functional languages § Framework § Simple parallelization model § Shared nothing architectures ("commodity hardware") § By Google (Jeff Dean and Sanjay Ghemawat) § Presented 2004 at OSDI'04 § Used for Web index and many other data processing jobs § Reimplemented by Yahoo as Apache Hadoop § We discuss the concepts, not one specific version 8 MapReduce Framework § Part of Google's Big Data Stack Sawzall Index … GMail Apps MapReduce BigTable Scheduling GFS Chubby System 9 Open Source Implementation § Apache Hadoop (I) Sawzall Index … GMail Apps § Open-source clone of Google Stack MapReduce BigTable § Initiated at Yahoo Scheduling GFS Chubby System PigLatin Hive MapReduce HBase Zookeeper HDFS 10 Map – Shuffle/Sort – Reduce Simple Example § Count the word frequencies in 1 petabyte of text § Simple problem § Becomes cumbersome for large input § Can easily be parallelized § 3 step solution § Split text in words (map phase) Works for surprisingly many problems with § Group words (shuffle phase) same outline § Count words in groups (reduce phase) 12 MapReduce - Conceptual Data Flow k a a 1 k b MAP b 1 REDUCE MR Framework Shuffle & Sort a 1 1 1 a 3 k b b 1 k a a 1 b 1 1 b 2 k c MAP c 1 REDUCE c 1 c 1 k e e 1 k a a 1 d 1 e 2 k d MAP REDUCE d 1 e 1 1 d 1 k e e 1 13 Map Example map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); § Computation per key value pair § For each input create list of output values § Example: § For each word in a sentence: emit a k/v pair indicating one occurrence of the word § (key, "hello world") -> ("hello", "1"), ("world", "1") § Signature: map(key, value) -> list(key', value') 14 Reduce Example reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result)); § Aggregation § Combine all intermediate values for one key § Example § Sum up all values for the same key § ("Hello", ("1", "1", "1", "1")) -> ("Hello", ("4")) § Signature: reduce (key, list(value)) -> list(key’, value’) 15 Full Code Example (Hadoop) public static class Reduce extends MapReduceBase import java.io.IOException; implements Reducer { import java.util.*; import org.apache.hadoop.fs.Path; public void reduce(Text key, Iterator values, import org.apache.hadoop.conf.*; OutputCollector output, Reporter reporter) import org.apache.hadoop.io.*; map(String key, String value): throws IOException { import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; for each word w in value: int sum = 0; EmitIntermediate(w, public class WordCount { "1"); while (values.hasNext()) { sum += values.next().get(); } public static class Map extends MapReduceBase output.collect(key, new IntWritable(sum)); implements Mapper { }} private final static IntWritable one = new IntWritable(1); public static void main(String[] args) private Text word = new Text(); throws Exception { JobConf conf = new JobConf(WordCount.class); public void map(LongWritable key, Text value, conf.setJobName("wordcount"); OutputCollector output, conf.setOutputKeyClass(Text.class); Reporter reporter) throws IOException { conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); String line = value.toString(); conf.setCombinerClass(Reduce.class); StringTokenizer tokenizer = new StringTokenizer(line); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); while (tokenizer.hasMoreTokens()) { conf.setOutputFormat(TextOutputFormat.class); word.set(tokenizer.nextToken()); FileInputFormat.setInputPaths(conf, output.collect(word, one); new Path(args)); } FileOutputFormat.setOutputPath(conf, } new Path(args)); } JobClient.runJob(conf); }} 16 Full Code Example (Hadoop) public static class Reduce extends MapReduceBase import java.io.IOException; implements Reducer { import java.util.*; import org.apache.hadoop.fs.Path; public void reduce(Text key, Iterator values, import org.apache.hadoop.conf.*; OutputCollector output, Reporter reporter) import org.apache.hadoop.io.*; map(String key, String value): throws IOException { import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; for each word w in value: int sum = 0; EmitIntermediate(w, public class WordCount { "1"); while (values.hasNext()) { sum += values.next().get(); } public static class Map extends MapReduceBase output.collect(key, new IntWritable(sum)); implements Mapper { }} private final static IntWritable one = new IntWritable(1); private Text word = new Text(); reduce(String public key, Iterator static void main(String[] args) values): throws Exception { int conf JobConf result = new = 0; JobConf(WordCount.class); public void map(LongWritable key, Text value, OutputCollector output, for each v in values: conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); Reporter reporter) throws IOException { result += ParseInt(v); conf.setOutputValueClass(IntWritable.class); String line = value.toString(); Emit(AsString(result)); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); StringTokenizer tokenizer = new StringTokenizer(line); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); while (tokenizer.hasMoreTokens()) { conf.setOutputFormat(TextOutputFormat.class); word.set(tokenizer.nextToken()); FileInputFormat.setInputPaths(conf, output.collect(word, one); new Path(args)); } FileOutputFormat.setOutputPath(conf, } new Path(args)); } JobClient.runJob(conf); }} 17 Map Phase – Example Word Count (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) na na na na (na. 1) na na na na Map hey hey hey (na. 1) goodbye (na. 1) (hey. 1) (hey. 1) (hey. 1) (na na hey hey kiss him (goodbye. 1) goodbye) na na na na na na na na hey hey hey goodbye 18 Sort Phase – Example Word Count (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (hey. 1) (na. 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1) (hey. 1) (hey. 1 1 1) (hey. 1) Sort (goodbye. 1) (goodbye. 1) (batman. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (batman. 1) 19 Reduce Phase – Example Word Count hey (hey. 1 1 1) (1 1 1) Reduce 3 (hey. 3) 20 Shuffling / Sorting Stage § Map Side: Input Split § Buffer in memory (e.g., 100 MB) § Partition Map § Sort (e.g., QuickSort or MergeSort) § Spill to disk Memory Buffer Partition Sort Spill to disk Merge 21 Shuffling / Sorting Stage II § Reducer Side: § Fetch partitions Fetch § Buffer in memory Memory Buffer § Merge files § Reduce Merge Reduce Output 22 Sorting in Detail Sorting – Merge Sort § Main-Memory-Algorithm (Divide-and-Conquer Algorithm) § Idea: Merge l ³ 2 sorted lists into a larger sorted list § Select the smallest element of the input lists and move it into the output list Input 1 Input 2 Output list 1. 1,3,4,9 2,5,7,8 - 2. 3,4,9 2,5,7,8 1 3. 3,4,9 5,7,8 1,2 4. 4,9 5,7,8 1,2,3 5. 9 5,7,8 1,2,3,4 6. 9 7,8 1,2,3,4,5 7. 9 8 1,2,3,4,5,7 8. 9 - 1,2,3,4,5,7,8 9. - - 1,2,3,4,5,7,8,9 24 Merge Sort § Recursion § Partition a list with more than one element into two lists of identical lengths 𝐿1 and 𝐿2 § Sort 𝐿1 and 𝐿2 recursively § Merge 𝐿1 and 𝐿2 into a sorted list § Cost (Input size |𝑅| = 𝑛) § Merging two sorted lists 𝐿1, 𝐿2: 𝑂(|𝐿1| + |𝐿2|) = 𝑂(𝑛) § Depth of the recursion: log2 𝑛 ! § Every recursion step reduces each list size by a factor of " # § after 𝑖 recursion steps there elements in a list "! § Thus: 𝑂(𝑛 log 𝑛) § Matches the lower bound for comparison-based sorting (proof in a data structures book!) 25 Two-Phase, Multiway Merge-Sort (TPMMS) § If data does not fit in memory… § Use multiple phases § Phase 1: § Load as many data items as fit in main memory (actually: sort buffer) § Sort lists in main memory (using quicksort or heap sort) § Write sorted lists back to disks § Result: many sorted list partitions (on disk) § Phase 2: § Merge all sorted list partitions into a single list 26 2-Way Sort § Phase 1: Read a block, sort it, write it. § Only one buffer block is used § Phase 2: Merge § Three buffer blocks used Main Memory Buffer Disk Disk 27 TPMMS – Phase 1 § Recursion now starts with more than one or two elements in each list partition! § Sorting the list partitions for instance with basic merge sort 1. Fill memory with disk blocks from original relation 2. Sort tuples in main memory 3. Write tuples into free blocks on disk ® Result: a single sorted list partition § Example § 6400 blocks in main memory; overall 100 000 blocks § 16 list partitions (last partition is smaller) § Cost: 200 000 I/O-Operations § 100 000 block reads § 100 000 block writes § Time: on average 11 ms per I/O-Operation ® 11 ms · 200 000 = 2 200 s = 37 min ® CPU time for sorting negligible we are just Bound by IO 28 TPMMS – Phase 2 § Naive Idea: merge the l list partitions pairwise in main memory § requires 2 log l read and write operations for each block (tuple) § In the example: One run for 16 list partitions, one run for 8, one run for 4 list partitions and a final run for 2 list partitions § Every block is processed with 8 I/O-operations What to do if § Better Idea: Only read the first block of every list partition number of list 1. Search smallest key among the first tuples of all blocks partitions larger than number of Linear search (lin.), priority queue (log.) blocks in main 2. Move this tuple into Output-Block (in main memory) memory? 3. If output block is full: Write to disk (for now) create 3. Phase were we do a classicla merge sort 4. If an input block is fully consumed: read next block from that list partition basically § Cost: 2 I/O-Operations per block (and tuple): also 37 min ® Overall cost for TPMMS: 74 min 29 Two-Phase Multiway Merge Sort To sort a file with 𝑁 blocks using 𝐵 main memory buffer blocks: § Phase 1: use 𝐵 buffer blocks. Produce 𝑁/𝐵 list partitions consisting of B blocks each. § Phase 2: merge up to 𝐵 − 1 runs. … … … Main Memory Buffer Disk Disk 30 Limitations of TPMMS § Notation: Block size B Bytes, main memory size (for blocks) M Bytes, Tuple size R Bytes § M / B blocks fit in main memory § In Phase 2 we need space for one output block § Phase 1 can generate at most M / B - 1 sorted list partitions (for two phases) § That is also how often one can fill memory and sort (in Phase 1) § Each sorting can sort M / R Tuples § At most (M / R) * ((M / B) - 1) ≈ M² / (RB) can be sorted § Our example: § M = 104 857 600 Bytes, B = 16 384 Bytes, R = 160 Bytes § Overall: § At most 4,2 billion Tuples (ca. 0.67 Terabyte) § With 16GB RAM -> 13 PB 31 Multi-Phase MMS § If relation is even larger § Add third phase § Use TPMMS to create sorted list partitions of size M2 / (RB) § Phase 3: Merge at most M / B – 1 sorted list partitions § Thus: M3 / (RB2) can be sorted § In our example: § at most 27 trillion tuples (ca. 4.3 Petabytes) § Overall runtime: 11 ms per block, 6 I/Os per block (2 sorting, 2 per merging phase) Ø (4.3 PB / 16 KB) * 6 I/Os * 11 ms ≈ 562 years 32 Short Break MapReduce Architecture Simplified Architecture § Input data is split into chunks sets of files Input Data Output Data § Primary assigns map tasks to workers § Workers perform map computation § Store partitioned, sorted data locally Mapping Primary Reduction Primary decides where to run reduction task § Primary assigns reduce tasks § Workers retrieve, sort, and shuffle data § Workers perform reduce task Worker Worker § Result is stored at global output data Worker 35 MapReduce Execution Stages § Scheduling: Assigns workers to map and reduce tasks § Data distribution: Moves processes to data (Map) § Synchronization: Gathers, sorts, and shuffles intermediate data (Reduce) § Errors and faults: Detects worker failures and restarts 36 Parallelism § map() tasks run in parallel, creating different intermediate values from different input data sets Problem ist between those two § reduce() tasks run in parallel, each working on a different output key § All values are processed independently, taking advantage of distribution § Bottleneck: Reduce phase cannot start until map phase is completely finished § Single key is not parallelized, potentially resulting to imbalanced load § If some workers are slow, they slow down the entire process down: Straggler problem § Start redundant workers and take result of fastest one 37 Parallelism II * From http://research.google.com/archive/mapreduce-osdi04-slides/index.html 38 Additional Optimizations § Combiner § Do a reduction on the mapping node we we have redundancy eg nans § Reduces intermediate results and network traffic § Partition § Partition the data for parallel reduction § Locality § Primary assigns map tasks local to data § Map not network bound 39 Combiners Needs to have same signature as Combine task § Often a map task will produce many pairs of the form (k,v1), (na. 1) (k,v2), … for the same key k (na. 1) (na. 1) § E.g., popular words in Word Count like (“the”, 1) (na. 1) (na. 1) (na. 1) § For associative ops. like sum, count, max, can save bandwidth (na. 1) by pre-aggregating at mapper (na. 1) § Decreases size of intermediate data Combine § Example: local counting for Word Count: (na. 8) def combine(key, values): output(key, sum(values)) 40 Partitioning Function § Inputs to map are created by contiguous splits of input files § For reduce, we need to ensure that records with the same intermediate key end up at the same worker § System uses a default partition function: hash(key) mod R § Distributes the intermediate keys to reduce workers “randomly” § Sometimes useful to override § Balance load manually if distribution of keys known § Specific requirement on which key-value pair should be in the same output files (e.g., global ordering) def partition(key, number of partitions): partition id for key 41 Locality § Primary divides up tasks based on location of data: § Tries to run map() tasks on same machine where physical input data resides (based on GFS/HDFS) § At least same rack, if not same machine § map() task inputs are divided into 64 MB blocks § Same size as GFS chunks § Effect: Thousands of machines read input at local disk speed 42 Other Optimizations § Optimization: replicate tasks § Close to completion, spawn redundant tasks § Reduces latency from slow workers dramatically § Skipping “bad” records in input § Map/Reduce functions sometimes fail for particular inputs § Best solution is to debug & fix § But not always possible because problems are in the input data and third party libraries § Solution § On fault, notify primary about the record processed before fault § If primary sees 2 failures for same record, it notifies workers to skip § Effect: Can work around bugs in third-party libraries § Other § Compression of intermediate data use some of the rest cores to compress 43 Fault Tolerance § Heartbeat messages § Detect failures § Worker failure / slow worker § Re-execute tasks § Each intermediate result (output of the mapper) is materialized on disk § Very expensive, but makes recovery of lost processes very simple and cheap § Inputs are stored in a fault tolerant way by the GFS/HDFS 44 Summary § MapReduce Programming Paradigm § Simplifies distributed, parallel data processing § Map: perform computation (on data) § Reduce: combine multiple data items § MapReduce Framework § Executes MapReduce jobs § Fault tolerant § Highly scalable 45 Next Part § Next Lecture: Map Reduce II Application / Query Language / Analytics / Visualization Application Data Processing Big Data Data Management Systems File System Virtualization / Container OS / Scheduling Infrastructure Hardware 46 Thank you for your attention! § Questions? § In Moodle § Per email: [email protected] § In Q&A sessions 47

Use Quizgecko on...
Browser
Browser