Big Data Systems Map Reduce I PDF
Document Details
Uploaded by GlamorousPanther8038
Hasso-Plattner-Institut
Martin Boissier
Tags
Related
- Big Data Principles and Best Practices of Scalable Real-time Data Systems PDF
- Big Data Systems - Key Value Stores I PDF
- Big Data Systems - Modern Hardware I - PDF
- Big Data Systems - Map Reduce I PDF 2024
- Big Data Systems - Use Case - Search Engines PDF
- Big Data Systems Benchmarking & Measurement PDF
Summary
This document is an introductory lecture on Big Data Systems, specifically covering the MapReduce paradigm. It discusses concepts, architecture, and some related topics like sorting large amounts of data, with supplementary information like announcements and a timeline of important events.
Full Transcript
Martin Boissier Big Data Systems Data Engineering Systems Map Reduce I Hasso Plattner Institute Announcements § One week left for the first exercise § Check out discussions in Moodle 2 Timeline I Date...
Martin Boissier Big Data Systems Data Engineering Systems Map Reduce I Hasso Plattner Institute Announcements § One week left for the first exercise § Check out discussions in Moodle 2 Timeline I Date Tuesday Wednesday 15.10. /16.10 Intro / Organizational Use Case - Search Engines 22.10. / 23.10. Performance Management Intro to GitHub Classroom 29.10. / 30.10. Map Reduce I Map Reduce II 5.11. / 6.11. Map Reduce III Exercise 12.11. / 13.11. Data Centers Cloud 19.11 / 20.11. File Systems Exercise 26.11. / 27.11. Key Value Stores I Key Value Stores II 3.12 / 4.12. Key Value Stores III Exercise 10.12. / 11.12. Stream Processing I Stream Processing II 17.12. / 18.12. ML Systems I Exercise Christmas Break 3 This Lecture 1. Map Reduce Paradigm 2. Sorting large amounts of data 3. MR Architecture Sources § Dean, Ghemawat: MapReduce: Simplified Data Processing on Large Clusters. OSDI’04 § Paper & Slides: http://research.google.com/archive/mapreduce.html § Mone: Beyond Hadoop. CACM, Vol. 56 No. 1 § Online: http://cacm.acm.org/magazines/2013/1/158759-beyond-hadoop/fulltext 4 Where are we? Application / Query § First intro to big data processing Language / Analytics / Application Visualization Data Processing § THE ancestor of all big data processing frameworks Data Management Big Data Systems File System § Can conceptually be used Virtualization / Container independently of the complete stack OS / Scheduling Infrastructure Hardware 5 Basic Web Search Interaction § Big data Document § Billions of web pages Store User § Non-interactive workloads Interaction § Mins/hours runtime ok Index § Distributed processing § 10s – 1000s of servers 6 Large Scale Data Analysis § Google's problem (in 2004) § 20+ billion web pages x 20KB = 400 TB § One computer can read 30-35 MB/sec from disk § 4 months to read the web § 1,000 hard drives just to store the web § Same problem with 1,000 machines < 3 hours § But distributed and parallel programming is hard! § Also, clusters break frequently at scale § Remember, Google cluster first year: § 1000 node failures, 1000s hard drive failures 7 MapReduce § Programming model § Large scale, distributed data processing § Inspired by map and reduce functions in functional languages § Framework § Simple parallelization model § Shared nothing architectures ("commodity hardware") § By Google (Jeff Dean and Sanjay Ghemawat) § Presented 2004 at OSDI'04 § Used for Web index and many other data processing jobs § Reimplemented by Yahoo as Apache Hadoop § We discuss the concepts, not one specific version 8 MapReduce Framework § Part of Google's Big Data Stack Sawzall Index … GMail Apps MapReduce BigTable Scheduling GFS Chubby System 9 Open Source Implementation § Apache Hadoop (I) Sawzall Index … GMail Apps § Open-source clone of Google Stack MapReduce BigTable § Initiated at Yahoo Scheduling GFS Chubby System PigLatin Hive MapReduce HBase Zookeeper HDFS 10 Map – Shuffle/Sort – Reduce Simple Example § Count the word frequencies in 1 petabyte of text § Simple problem § Becomes cumbersome for large input § Can easily be parallelized § 3 step solution § Split text in words (map phase) Works for surprisingly many problems with § Group words (shuffle phase) same outline § Count words in groups (reduce phase) 12 MapReduce - Conceptual Data Flow k a a 1 k b MAP b 1 REDUCE MR Framework Shuffle & Sort a 1 1 1 a 3 k b b 1 k a a 1 b 1 1 b 2 k c MAP c 1 REDUCE c 1 c 1 k e e 1 k a a 1 d 1 e 2 k d MAP REDUCE d 1 e 1 1 d 1 k e e 1 13 Map Example map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); § Computation per key value pair § For each input create list of output values § Example: § For each word in a sentence: emit a k/v pair indicating one occurrence of the word § (key, "hello world") -> ("hello", "1"), ("world", "1") § Signature: map(key, value) -> list(key', value') 14 Reduce Example reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result)); § Aggregation § Combine all intermediate values for one key § Example § Sum up all values for the same key § ("Hello", ("1", "1", "1", "1")) -> ("Hello", ("4")) § Signature: reduce (key, list(value)) -> list(key’, value’) 15 Full Code Example (Hadoop) public static class Reduce extends MapReduceBase import java.io.IOException; implements Reducer { import java.util.*; import org.apache.hadoop.fs.Path; public void reduce(Text key, Iterator values, import org.apache.hadoop.conf.*; OutputCollector output, Reporter reporter) import org.apache.hadoop.io.*; map(String key, String value): throws IOException { import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; for each word w in value: int sum = 0; EmitIntermediate(w, public class WordCount { "1"); while (values.hasNext()) { sum += values.next().get(); } public static class Map extends MapReduceBase output.collect(key, new IntWritable(sum)); implements Mapper { }} private final static IntWritable one = new IntWritable(1); public static void main(String[] args) private Text word = new Text(); throws Exception { JobConf conf = new JobConf(WordCount.class); public void map(LongWritable key, Text value, conf.setJobName("wordcount"); OutputCollector output, conf.setOutputKeyClass(Text.class); Reporter reporter) throws IOException { conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); String line = value.toString(); conf.setCombinerClass(Reduce.class); StringTokenizer tokenizer = new StringTokenizer(line); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); while (tokenizer.hasMoreTokens()) { conf.setOutputFormat(TextOutputFormat.class); word.set(tokenizer.nextToken()); FileInputFormat.setInputPaths(conf, output.collect(word, one); new Path(args)); } FileOutputFormat.setOutputPath(conf, } new Path(args)); } JobClient.runJob(conf); }} 16 Full Code Example (Hadoop) public static class Reduce extends MapReduceBase import java.io.IOException; implements Reducer { import java.util.*; import org.apache.hadoop.fs.Path; public void reduce(Text key, Iterator values, import org.apache.hadoop.conf.*; OutputCollector output, Reporter reporter) import org.apache.hadoop.io.*; map(String key, String value): throws IOException { import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; for each word w in value: int sum = 0; EmitIntermediate(w, public class WordCount { "1"); while (values.hasNext()) { sum += values.next().get(); } public static class Map extends MapReduceBase output.collect(key, new IntWritable(sum)); implements Mapper { }} private final static IntWritable one = new IntWritable(1); private Text word = new Text(); reduce(String public key, Iterator static void main(String[] args) values): throws Exception { int conf JobConf result = new = 0; JobConf(WordCount.class); public void map(LongWritable key, Text value, OutputCollector output, for each v in values: conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); Reporter reporter) throws IOException { result += ParseInt(v); conf.setOutputValueClass(IntWritable.class); String line = value.toString(); Emit(AsString(result)); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); StringTokenizer tokenizer = new StringTokenizer(line); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); while (tokenizer.hasMoreTokens()) { conf.setOutputFormat(TextOutputFormat.class); word.set(tokenizer.nextToken()); FileInputFormat.setInputPaths(conf, output.collect(word, one); new Path(args)); } FileOutputFormat.setOutputPath(conf, } new Path(args)); } JobClient.runJob(conf); }} 17 Map Phase – Example Word Count (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) na na na na (na. 1) na na na na Map hey hey hey (na. 1) goodbye (na. 1) (hey. 1) (hey. 1) (hey. 1) (na na hey hey kiss him (goodbye. 1) goodbye) na na na na na na na na hey hey hey goodbye 18 Sort Phase – Example Word Count (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (hey. 1) (na. 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1) (hey. 1) (hey. 1 1 1) (hey. 1) Sort (goodbye. 1) (goodbye. 1) (batman. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (na. 1) (batman. 1) 19 Reduce Phase – Example Word Count hey (hey. 1 1 1) (1 1 1) Reduce 3 (hey. 3) 20 Shuffling / Sorting Stage § Map Side: Input Split § Buffer in memory (e.g., 100 MB) § Partition Map § Sort (e.g., QuickSort or MergeSort) § Spill to disk Memory Buffer Partition Sort Spill to disk Merge 21 Shuffling / Sorting Stage II § Reducer Side: § Fetch partitions Fetch § Buffer in memory Memory Buffer § Merge files § Reduce Merge Reduce Output 22 Sorting in Detail Sorting – Merge Sort § Main-Memory-Algorithm (Divide-and-Conquer Algorithm) § Idea: Merge l ³ 2 sorted lists into a larger sorted list § Select the smallest element of the input lists and move it into the output list Input 1 Input 2 Output list 1. 1,3,4,9 2,5,7,8 - 2. 3,4,9 2,5,7,8 1 3. 3,4,9 5,7,8 1,2 4. 4,9 5,7,8 1,2,3 5. 9 5,7,8 1,2,3,4 6. 9 7,8 1,2,3,4,5 7. 9 8 1,2,3,4,5,7 8. 9 - 1,2,3,4,5,7,8 9. - - 1,2,3,4,5,7,8,9 24 Merge Sort § Recursion § Partition a list with more than one element into two lists of identical lengths 𝐿1 and 𝐿2 § Sort 𝐿1 and 𝐿2 recursively § Merge 𝐿1 and 𝐿2 into a sorted list § Cost (Input size |𝑅| = 𝑛) § Merging two sorted lists 𝐿1, 𝐿2: 𝑂(|𝐿1| + |𝐿2|) = 𝑂(𝑛) § Depth of the recursion: log2 𝑛 ! § Every recursion step reduces each list size by a factor of " # § after 𝑖 recursion steps there elements in a list "! § Thus: 𝑂(𝑛 log 𝑛) § Matches the lower bound for comparison-based sorting (proof in a data structures book!) 25 Two-Phase, Multiway Merge-Sort (TPMMS) § If data does not fit in memory… § Use multiple phases § Phase 1: § Load as many data items as fit in main memory (actually: sort buffer) § Sort lists in main memory (using quicksort or heap sort) § Write sorted lists back to disks § Result: many sorted list partitions (on disk) § Phase 2: § Merge all sorted list partitions into a single list 26 2-Way Sort § Phase 1: Read a block, sort it, write it. § Only one buffer block is used § Phase 2: Merge § Three buffer blocks used Main Memory Buffer Disk Disk 27 TPMMS – Phase 1 § Recursion now starts with more than one or two elements in each list partition! § Sorting the list partitions for instance with basic merge sort 1. Fill memory with disk blocks from original relation 2. Sort tuples in main memory 3. Write tuples into free blocks on disk ® Result: a single sorted list partition § Example § 6400 blocks in main memory; overall 100 000 blocks § 16 list partitions (last partition is smaller) § Cost: 200 000 I/O-Operations § 100 000 block reads § 100 000 block writes § Time: on average 11 ms per I/O-Operation ® 11 ms · 200 000 = 2 200 s = 37 min ® CPU time for sorting negligible we are just Bound by IO 28 TPMMS – Phase 2 § Naive Idea: merge the l list partitions pairwise in main memory § requires 2 log l read and write operations for each block (tuple) § In the example: One run for 16 list partitions, one run for 8, one run for 4 list partitions and a final run for 2 list partitions § Every block is processed with 8 I/O-operations What to do if § Better Idea: Only read the first block of every list partition number of list 1. Search smallest key among the first tuples of all blocks partitions larger than number of Linear search (lin.), priority queue (log.) blocks in main 2. Move this tuple into Output-Block (in main memory) memory? 3. If output block is full: Write to disk (for now) create 3. Phase were we do a classicla merge sort 4. If an input block is fully consumed: read next block from that list partition basically § Cost: 2 I/O-Operations per block (and tuple): also 37 min ® Overall cost for TPMMS: 74 min 29 Two-Phase Multiway Merge Sort To sort a file with 𝑁 blocks using 𝐵 main memory buffer blocks: § Phase 1: use 𝐵 buffer blocks. Produce 𝑁/𝐵 list partitions consisting of B blocks each. § Phase 2: merge up to 𝐵 − 1 runs. … … … Main Memory Buffer Disk Disk 30 Limitations of TPMMS § Notation: Block size B Bytes, main memory size (for blocks) M Bytes, Tuple size R Bytes § M / B blocks fit in main memory § In Phase 2 we need space for one output block § Phase 1 can generate at most M / B - 1 sorted list partitions (for two phases) § That is also how often one can fill memory and sort (in Phase 1) § Each sorting can sort M / R Tuples § At most (M / R) * ((M / B) - 1) ≈ M² / (RB) can be sorted § Our example: § M = 104 857 600 Bytes, B = 16 384 Bytes, R = 160 Bytes § Overall: § At most 4,2 billion Tuples (ca. 0.67 Terabyte) § With 16GB RAM -> 13 PB 31 Multi-Phase MMS § If relation is even larger § Add third phase § Use TPMMS to create sorted list partitions of size M2 / (RB) § Phase 3: Merge at most M / B – 1 sorted list partitions § Thus: M3 / (RB2) can be sorted § In our example: § at most 27 trillion tuples (ca. 4.3 Petabytes) § Overall runtime: 11 ms per block, 6 I/Os per block (2 sorting, 2 per merging phase) Ø (4.3 PB / 16 KB) * 6 I/Os * 11 ms ≈ 562 years 32 Short Break MapReduce Architecture Simplified Architecture § Input data is split into chunks sets of files Input Data Output Data § Primary assigns map tasks to workers § Workers perform map computation § Store partitioned, sorted data locally Mapping Primary Reduction Primary decides where to run reduction task § Primary assigns reduce tasks § Workers retrieve, sort, and shuffle data § Workers perform reduce task Worker Worker § Result is stored at global output data Worker 35 MapReduce Execution Stages § Scheduling: Assigns workers to map and reduce tasks § Data distribution: Moves processes to data (Map) § Synchronization: Gathers, sorts, and shuffles intermediate data (Reduce) § Errors and faults: Detects worker failures and restarts 36 Parallelism § map() tasks run in parallel, creating different intermediate values from different input data sets Problem ist between those two § reduce() tasks run in parallel, each working on a different output key § All values are processed independently, taking advantage of distribution § Bottleneck: Reduce phase cannot start until map phase is completely finished § Single key is not parallelized, potentially resulting to imbalanced load § If some workers are slow, they slow down the entire process down: Straggler problem § Start redundant workers and take result of fastest one 37 Parallelism II * From http://research.google.com/archive/mapreduce-osdi04-slides/index.html 38 Additional Optimizations § Combiner § Do a reduction on the mapping node we we have redundancy eg nans § Reduces intermediate results and network traffic § Partition § Partition the data for parallel reduction § Locality § Primary assigns map tasks local to data § Map not network bound 39 Combiners Needs to have same signature as Combine task § Often a map task will produce many pairs of the form (k,v1), (na. 1) (k,v2), … for the same key k (na. 1) (na. 1) § E.g., popular words in Word Count like (“the”, 1) (na. 1) (na. 1) (na. 1) § For associative ops. like sum, count, max, can save bandwidth (na. 1) by pre-aggregating at mapper (na. 1) § Decreases size of intermediate data Combine § Example: local counting for Word Count: (na. 8) def combine(key, values): output(key, sum(values)) 40 Partitioning Function § Inputs to map are created by contiguous splits of input files § For reduce, we need to ensure that records with the same intermediate key end up at the same worker § System uses a default partition function: hash(key) mod R § Distributes the intermediate keys to reduce workers “randomly” § Sometimes useful to override § Balance load manually if distribution of keys known § Specific requirement on which key-value pair should be in the same output files (e.g., global ordering) def partition(key, number of partitions): partition id for key 41 Locality § Primary divides up tasks based on location of data: § Tries to run map() tasks on same machine where physical input data resides (based on GFS/HDFS) § At least same rack, if not same machine § map() task inputs are divided into 64 MB blocks § Same size as GFS chunks § Effect: Thousands of machines read input at local disk speed 42 Other Optimizations § Optimization: replicate tasks § Close to completion, spawn redundant tasks § Reduces latency from slow workers dramatically § Skipping “bad” records in input § Map/Reduce functions sometimes fail for particular inputs § Best solution is to debug & fix § But not always possible because problems are in the input data and third party libraries § Solution § On fault, notify primary about the record processed before fault § If primary sees 2 failures for same record, it notifies workers to skip § Effect: Can work around bugs in third-party libraries § Other § Compression of intermediate data use some of the rest cores to compress 43 Fault Tolerance § Heartbeat messages § Detect failures § Worker failure / slow worker § Re-execute tasks § Each intermediate result (output of the mapper) is materialized on disk § Very expensive, but makes recovery of lost processes very simple and cheap § Inputs are stored in a fault tolerant way by the GFS/HDFS 44 Summary § MapReduce Programming Paradigm § Simplifies distributed, parallel data processing § Map: perform computation (on data) § Reduce: combine multiple data items § MapReduce Framework § Executes MapReduce jobs § Fault tolerant § Highly scalable 45 Next Part § Next Lecture: Map Reduce II Application / Query Language / Analytics / Visualization Application Data Processing Big Data Data Management Systems File System Virtualization / Container OS / Scheduling Infrastructure Hardware 46 Thank you for your attention! § Questions? § In Moodle § Per email: [email protected] § In Q&A sessions 47 Prof. Tilmann Rabl Big Data Systems Data Engineering Systems Map Reduce II Hasso Plattner Institute Announcements § Less than one week left for the first exercise 2 Multi-Phase MMS § If relation is even larger § Add third phase § Use TPMMS to create sorted list partitions of size M2 / (RB) § Phase 3: Merge at most M / B – 1 sorted list partitions § Thus: M3 / (RB2) can be sorted § In our example: § at most 27 trillion tuples (ca. 4.3 Petabytes) § Overall runtime: 11 ms per block, 6 I/Os per block (2 sorting, 2 per merging phase) Ø (4.3 PB / 16 KB) * 6 I/Os * 11 ms ≈ 562 years 3 Timeline I Date Tuesday Wednesday 15.10. /16.10 Intro / Organizational Use Case - Search Engines 22.10. / 23.10. Performance Management Intro to GitHub Classroom 29.10. / 30.10. Map Reduce I Map Reduce II 5.11. / 6.11. Map Reduce III Exercise 12.11. / 13.11. Data Centers Cloud 19.11 / 20.11. File Systems Exercise 26.11. / 27.11. Key Value Stores I Key Value Stores II 3.12 / 4.12. Key Value Stores III Exercise 10.12. / 11.12. Stream Processing I Stream Processing II 17.12. / 18.12. ML Systems I Exercise Christmas Break 5 This Lecture 1. MR Algorithms 2. Page Rank in MR 3. Hive 4. SQL on MR Sources § Dean, Ghemawat: MapReduce: Simplified Data Processing on Large Clusters. OSDI’04 § Paper & Slides: http://research.google.com/archive/mapreduce.html § Mone: Beyond Hadoop. CACM, Vol. 56 No. 1 § Online: http://cacm.acm.org/magazines/2013/1/158759-beyond-hadoop/fulltext 6 Where are we? § General MR processing Application / Query Language / Analytics / Visualization Application Data Processing Data Management Big Data Systems File System Virtualization / Container OS / Scheduling Infrastructure Hardware 7 MR Algorithms MapReduce § Programming model § Large scale, distributed data processing § Inspired by map and reduce functions in functional languages § Framework § Simple parallelization model § Shared nothing architectures (“commodity hardware”) § By Google (Jeff Dean and Sanjay Ghemawat) § Presented 2004 at OSDI'04 § Used for Web index and many other data processing jobs § Reimplemented by Yahoo as Apache Hadoop https://hadoop.apache.org/ 9 MapReduce Program Design Map § Task performed independently on data objects § Example: transformation, projection, filter Reduce § Task combining multiple data objects § Example: count, sum, min, max, top-k § Often multiple phases of Map and Reduce needed § The framework always does shuffle and sort between Map and Reduce 10 Application – Inverted Index § List of documents that contain a certain term § Basic structure of modern search engines … cat‘s … … … Cat § Map process (parser): … … … cat chase § Tokenize documents Dog dog … … … § Create (term, document) pairs … likes the dog … § Reduce process (inverter) § Combine pairs with same term to inverted index 11 Page Rank § Order web pages by their importance § Importance means § How many pages link to it § How important are these pages § Idea: How likely will a person randomly surfing the web end up on this page Newsweek cover 12 Page Rank in MR § PageRank: Probability of reaching a certain web page by randomly clicking on links § PageRank (PR) of page A is the sum of PageRank of all pages (t1…tn) with inbound links divided by the number of their outbound links L(ti), multiplied with a damping factor d PR (t1 ) PR (t n ) PR ( A) = (1 - d ) + d ( + + ) L(t1 ) L(t n ) § Iteratively calculated Lin J, Dyer C: Data-Intensive Text Processing with MapReduce. Morgan Claypool Synthesis Lectures on Human Language Technologies, 2010. 13 Page Rank: Basic Example § Five web pages connected with web links, no dangling pages (no outgoing links), no pages with no ingoing links § Initialize with equal page rank § Per node distribute page rank on outgoing edges § Calculate new page rank based on ingoing edges n4(0.2) n4(0.166) n1(0.2) 0.1 n1(0.066) 0.1 0.1 0.1 0.066 0.066 0.066 n5(0.2) n5(0.3) n3(0.2) n3(0.166) 0.2 0.2 n2(0.2) n2(0.3) 14 Page Rank: Basic Example – Second iteration § Repeat until converged n4(0.166) n4(0.133) n1(0.066) 0.083 n1(0.1) 0.033 0.083 0.033 0.1 0.1 0.1 n5(0.3) n5(0.383) n3(0.166) n3(0.183) 0.3 0.166 n2(0.3) n2(0.2) 15 Page Rank in MapReduce – Phase 1 § Map: n4 Input § Input: DocID A, PageRank (A), list of outgoing links {B1,…} n1(0.2) § Output: DocID outgoing link B1, PageRank(A)/L(A), DocID A n2 n1 0.1 n4 Output 0.1 n1 n2 § Reduce: n1 Input § Input: DocID B, {PageRank (A1) / L(A1), …} 0.1 n2 § Compute PageRank B n3 0.2 § Output: DocID B, PageRank (B), list of ingoing links {A1,…} Output n1 n2(0.3) n3 16 Page Rank in MapReduce – Data Distribution Phase I (N2, 0.1, N1) (N1, 0.2, N2, N4) (N4, 0.1, N1) (N1, 0.06, N5) (N1, 0.06, N5) MR Framework Shuffle & Sort MAP (N3, 0.1, N4) (N4, 0.1, N1) REDUCE (N4, 0.2, N3, N5) (N4, 0.16, N1 , N5) (N5, 0.1, N4) (N4, 0.06, N5) (N5, 0.2, N2) (N2, 0.1, N1) (N2, 0.2, N5) (N1, 0.06, N5) (N2, 0.2, N3) (N2, 0.3, N1 , N3) MAP REDUCE (N5, 0.2, N1, N3, N4) (N3, 0.06, N5) (N5, 0.2, N2) (N5, 0.3, N2 , N4) (N4, 0.06, N5) (N5, 0.1, N4) (N3, 0.1, N4) (N3, 0.2, N2) MAP (N2, 0.2, N3) REDUCE (N3, 0.16, N4 , N5) (N3, 0.06, N5) DocID, PageRank, outgoing links DocID, PageRank, ingoing link DocID, PageRank, ingoing link DocID, PageRank, ingoing links 17 Page Rank in MapReduce – Phase II § Map: n1 Input § Input: DocID B, PageRank (B), list of ingoing links n2(0.3) {A1,…} n3 § Output: DocID A1, DocID B and DocID B, PageRank (B) n1 n2 Output n3 n2 n2(0.3) n2 § Reduce: n1 Input § Input: DocID A, outgoing link (B) n4 n1(0.066) or DocID A, PageRank (B) n4 § Output: DocID B, PageRank(B), list of outgoing links Output {A1,…} n2 n1(0.066) 18 Page Rank in MapReduce – Data Distribution Phase II (N5, N1) (N1, N2) (N1, 0.06) (N1, N4) (N1, 0.06, N5) (N1, 0.06) (N1, 0.06, N2, N4) MR Framework Shuffle & Sort MAP (N1, N4) REDUCE (N4, 0.16, N1 , N5) (N5, N4) (N4, N3) (N4, 0.16, N3, N5) (N4, 0.16) (N4, N5) (N4, 0.16) (N1, N2) (N2, N5) (N3, N2) (N2, 0.3) (N2, 0.3, N1 , N3) (N2, 0.3) (N2, 0.3, N5) (N5, N1) MAP (N2, N5) (N5, N3) REDUCE (N5, 0.3, N2 , N4) (N5, 0.3, N1, N3, N4) (N4, N5) (N5, N4) (N5, 0.3) (N5, 0.3) (N4, N3) (N3, N2) (N3, 0.16, N4 , N5) MAP (N5, N3) REDUCE (N3, 0.16, N2) (N3, 0.16) (N3, 0.16) DocID, PageRank, ingoing links DocID, outgoing link DocID, outgoing link DocID, PageRank, outgoing links DocID, PageRank DocID, PageRank 19 Page Rank cont’d § Preprocessing § Initial PageRank, remove dangling pages (no outgoing links) and pages without ingoing links § Iterate § Until converges / fix number of iterations § Postprocessing § Add removed pages (with damping factor), sort URLs by PageRank Preprocessing Step I Step II Postprocessing 20 Hive MapReduce Stack § Apache Hadoop Sawzall Index … GMail Apps § Open-source clone of Google Stack MapReduce BigTable § Initiated at Yahoo Initiated at Facebook Scheduling GFS Chubby System PigLatin Hive MapReduce HBase Zookeeper HDFS 22 Hive § Data warehouse on top of Hadoop § DWH queries mapped to MR jobs § No online queries § Some additional features for performance § Metadata in Derby database § Indexes § SQL like syntax § With optional MR tasks 23 Hive Architecture § Metastore § Metadata of tables, columns, partitions § Driver § Sessions, HiveQL lifecycle § Query Compiler § HiveQL -> MR tasks § Execution Engine § Interaction with MR Engine § HiveServer § Integration with other applications (JDBC, Thrift, …) § Interfaces § SerDe, UDFs A. Thusoo, et al.: Hive: A Petabyte Scale Data Warehouse Using Hadoop. In ICDE '10:996-1005. 24 Data Types and Data Access § Data types § Simple: tinyint, …, bigint, boolean, float, string § Complex: struct, map, array § Flexible (de)serialization § Different (user defined) formats, e.g., XML, JSON, CSV § Different storage: file, ProtocolBuffer 25 Data Storage § Tables: HDFS directory § Partition: subdirectory in table directory § Bucket: file within table or partition directory § Prune data based on partitions or buckets CREATE TABLE test_part(c1 string, c2 int) PARTITIONED BY (ds string, hr int); INSERT OVERWRITE TABLE test_part PARTITION(ds='2009-01-01', hr=12); -> /user/hive/warehouse/test_part/ds=2009-01-01/hr=12 SELECT * FROM test_part TABLESAMPLE(2 OUT OF 32); § Also: external tables (other location in HDFS) 26 HiveQL SELECT pid1, pid2, COUNT (*) AS cnt § SQL-like language FROM ( FROM ( § Sub queries SELECT s.ss_ticket_number AS oid , s.ss_item_sk AS pid FROM store_sales s INNER JOIN item i ON s.ss_item_sk = i.i_item_sk § Join: only support equal WHERE i.i_category_id IN (1 ,2 ,3) AND s.ss_store_sk IN (10 , 20, 33, 40, 50) CLUSTER BY oid predicates ) q01_map_output REDUCE q01_map_output.oid, q01_map_output.pid § Group by USING 'java -cp bigbenchqueriesmr.jar:hive-contrib.jar bigbench.queries.q01.Red' AS (pid1 BIGINT, pid2 BIGINT) § Cartesian product ) q01_temp_basket GROUP BY pid1, pid2 HAVING COUNT (pid1) >= 50 ORDER BY pid1, cnt, pid2; § Support analysis expressed by map-reduce programs FROM ( MAP doctext USING 'python wc_mapper.py' AS (word, cnt) FROM docs CLUSTER BY word ) a REDUCE word, cnt USING 'python wc_reduce.py'; 27 Query Compilation § Query is translated into a DAG § Nodes: Operators SELECT * § TableScan FROM WHERE status_updates status § Select, Extract LIKE 'Michael Jackson' § Filter § Join, MapJoin, Sorted Merge Map Join § GroupBy, Limit § Union, Collect § FileSink, HashTableSink, ReduceSink § UDTF § DAG represents dataflow 28 Query Compilation Flow HiveServer Optimization Execution Engine Type checking Physical Query plan Task submitted Parse Optimization execution Semantic generation analysis HiveQL Abstract Query block Query block Map/Reduce Query syntax tree tree tree + HDFS task § Similar to traditional database system 29 Complex Queries § Query plans can easily become complex § Query optimization § Project out unnecessary columns § Push down filters § Prune unused partitions § Dynamic join optimization during run time (MapJoin vs Repartition Join) § Chain folding § Collapse MapReduce stages 30 Hive on other Backends § Hive suffers from MR performance § Can use other backends as well § Tez, Spark, … § What remains: § Metastore concept § Optimizations 31 Short Break SQL on MR Translating SQL to MR § Select, Project, Aggregate § Easy § Join § Different implementations § Complex queries § Multiple MR Stages 34 Relational Operators in Map Reduce § Selection / projection / aggregation § SQL Query: SELECT year, SUM(price) FROM sales WHERE area_code = "US" GROUP BY year § Map/Reduce job: map(key, tuple) { reduce(key, tuples) { if (tuple.area_code == "US") { double sum_price = 0; int year = YEAR(tuple.date); foreach (tuple in tuples) { emit(year, {'price' => tuple.price}); sum_price += tuple.price; } } } emit(key, sum_price); } 35 Repartition Join § Equi-Join: L(A,X) ⋈ R(X,C) § Assumption: |L| < |R| R R R § Mapper L(A,X) ⋈ R(X,C) build L § Identical processing logic for L and R LR LR LR § Evaluate local predicates to filter unneeded tuples (optional) h(key) % n § Emits each tuple once M M M § The intermediate key is value of the join key X read L R L R L R § Partition and sort § Modulo division of the join key hash value § Input is sorted on the join key § Output: L(i), R(i) tuples for join key x § Reduce § Buffer all tuples per key § Join each R-tuple with matching L-tuples 36 Improved Repartition Join § Equi-Join: L(A,X) ⋈ R(X,C) § Assumption: |L| < |R| R R R § Mapper L(A,X) ⋈ R(X,C) build L § Identical processing logic for L and R LR LR LR § Evaluate local predicates to filter unneeded tuples (optional) § Emits each tuple once h(key) % n § The intermediate key is a pair of M M M § the value of the actual join key X read § an annotation identifying to which relation the tuple belongs to (L or R) L R L R L R § Partition and sort § Modulo division of the join key hash value § Input is sorted primary on the join key, secondary on the relation name § Output: a sequence of L(i), R(i) blocks of tuples for ascending join key i § Reduce § Collect all L-tuples for the current L(i) block in a hash map § Combine them with each R-tuple of the subsequent R(i)-tuple block 37 Broadcast Join § Equi-Join: L(A,X) ⋈ R(X,C) § Assumption: |L|