ID2221.pdf
Document Details
Uploaded by VibrantVorticism603
KTH Royal Institute of Technology
Full Transcript
An Introduction to Data Intensive Computing Amir H. Payberah [email protected] 2024-08-27 Course Information 1 / 74 Course Objective ▶ Provide students with a solid foundation for understanding large scale distributed...
An Introduction to Data Intensive Computing Amir H. Payberah [email protected] 2024-08-27 Course Information 1 / 74 Course Objective ▶ Provide students with a solid foundation for understanding large scale distributed systems used for storing and processing massive data. ▶ Cover a wide variety of advanced topics in data intensive computing platforms, i.e., the frameworks to store and process big data. 2 / 74 Intended Learning Outcomes (ILOs) ▶ ILO1: Understand the main concepts of data-intensive computation platforms. 3 / 74 Intended Learning Outcomes (ILOs) ▶ ILO1: Understand the main concepts of data-intensive computation platforms. ▶ ILO2: Apply the grabbed knowledge to store and process massive data. 3 / 74 Intended Learning Outcomes (ILOs) ▶ ILO1: Understand the main concepts of data-intensive computation platforms. ▶ ILO2: Apply the grabbed knowledge to store and process massive data. ▶ ILO3: Analyze the technical merits of data-intensive computation platforms. 3 / 74 The Course Assessment ▶ Task1: the review questions. 4 / 74 The Course Assessment ▶ Task1: the review questions. ▶ Task2: the project assignment. 4 / 74 The Course Assessment ▶ Task1: the review questions. ▶ Task2: the project assignment. ▶ Task3: the essay and the presentation. 4 / 74 The Course Assessment ▶ Task1: the review questions. ▶ Task2: the project assignment. ▶ Task3: the essay and the presentation. ▶ Task4: the final exam. 4 / 74 The Course Assessment ▶ Task1: the review questions. ▶ Task2: the project assignment. ▶ Task3: the essay and the presentation. ▶ Task4: the final exam. ▶ Task1, Task2, and Task3 should be done in groups of two/three students. 4 / 74 How Each ILO is Assessed? Task1 Task2 Task3 Task4 ILO1 x x ILO2 x ILO3 x 5 / 74 Task1: The Review Questions (P/F) ▶ Five set of review questions, one set for each week. 6 / 74 Task2: The Project Assignment (P/F) ▶ Proposed by students and confirmed by the teacher. ▶ Source code and oral presentation. 7 / 74 Task3: The Essay and The Presentation (A-F) ▶ One module for each group: writing an essay and presenting it to their opponents (another group). 8 / 74 Task3: The Essay and The Presentation (A-F) ▶ One module for each group: writing an essay and presenting it to their opponents (another group). ▶ Grading of this task has the following parts: 8 / 74 Task3: The Essay and The Presentation (A-F) ▶ One module for each group: writing an essay and presenting it to their opponents (another group). ▶ Grading of this task has the following parts: E : Essay (5 points) 8 / 74 Task3: The Essay and The Presentation (A-F) ▶ One module for each group: writing an essay and presenting it to their opponents (another group). ▶ Grading of this task has the following parts: E : Essay (5 points) P: Presentation (2 points) 8 / 74 Task3: The Essay and The Presentation (A-F) ▶ One module for each group: writing an essay and presenting it to their opponents (another group). ▶ Grading of this task has the following parts: E : Essay (5 points) P: Presentation (2 points) Q: Reviewing essay and asking questions (2 points) 8 / 74 Task3: The Essay and The Presentation (A-F) ▶ One module for each group: writing an essay and presenting it to their opponents (another group). ▶ Grading of this task has the following parts: E : Essay (5 points) P: Presentation (2 points) Q: Reviewing essay and asking questions (2 points) A: Answering questions (1 point) 8 / 74 Task3: The Essay and The Presentation (A-F) ▶ One module for each group: writing an essay and presenting it to their opponents (another group). ▶ Grading of this task has the following parts: E : Essay (5 points) P: Presentation (2 points) Q: Reviewing essay and asking questions (2 points) A: Answering questions (1 point) ▶ The final grade: A: 10, B: 9, C: 8, D: 7, E: 6, F: 1990 AND m.released < 2000 RETURN m; 63 / 70 Cypher Example (2/2) // Return nodes with label Person and name property equals ’Tom Hanks’ MATCH (p:Person) WHERE p.name = ’Tom Hanks’ RETURN p; // Return nodes with label Movie, released property is between 1991 and 1999 MATCH (m:Movie) WHERE m.released > 1990 AND m.released < 2000 RETURN m; // Find all the movies Tom Hanks acted in MATCH (:Person {name:’Tom Hanks’})-[:ACTED_IN]->(m:Movie) RETURN m.title; 63 / 70 Summary 64 / 70 Summary ▶ NoSQL data models: key-value, column-oriented, document-oriented, graph-based ▶ CAP (Consistency vs. Availability) 65 / 70 Summary ▶ BigTable ▶ Column-oriented ▶ Main components: master, tablet server, client library ▶ Basic components: GFS, SSTable, Chubby ▶ CP 66 / 70 Summary ▶ Cassandra ▶ Column-oriented (similar to BigTable) ▶ Consistency hashing ▶ Gossip-based membership ▶ AP 67 / 70 Summary ▶ Neo4j ▶ Graph-based ▶ Cypher ▶ CA 68 / 70 References ▶ F. Chang et al., Bigtable: A distributed storage system for structured data, ACM Transactions on Computer Systems (TOCS) 26.2, 2008. ▶ A. Lakshman et al., Cassandra: a decentralized structured storage system, ACM SIGOPS Operating Systems Review 44.2, 2010. ▶ I. Robinson et al., Graph Databases (2nd ed.), O’Reilly Media, 2015. 69 / 70 Questions? Acknowledgements Some content of the Neo4j slides were derived from Ljubica Lazarevic’s slides. 70 / 70 A Crash Course on Scala Amir H. Payberah [email protected] 2024-09-03 Introduction ▶ Scala: scalable language ▶ A blend of object-oriented and functional programming. ▶ Runs on the Java Virtual Machine. ▶ Designed by Martin Odersky at EPFL. 1 / 70 2 / 70 The “Hello, world!” Program object HelloWorld { def main(args: Array[String]) { println("Hello, world!") } } 3 / 70 The “Hello, world!” Program object HelloWorld { def main(args: Array[String]) { println("Hello, world!") } } // Compile it! > scalac HelloWorld.scala // Execute it! > scala HelloWorld 3 / 70 Run in Jupyter-Notebook ▶ Apache toree, polyglot,... 4 / 70 Outline ▶ Scala basics ▶ Functions ▶ Collections ▶ Classes and objects ▶ SBT 5 / 70 Outline ▶ Scala basics ▶ Functions ▶ Collections ▶ Classes and objects ▶ SBT 5 / 70 Scala Variables ▶ Values: immutable ▶ Variables: mutable ▶ Always use immutable values by default, unless you know for certain they need to be mutable. var myVar: Int = 0 val myVal: Int = 1 // Scala figures out the type of variables based on the assigned values var myVar = 0 val myVal = 1 // If the initial values are not assigned, it cannot figure out the type var myVar: Int val myVal: Int 6 / 70 Scala Data Types ▶ Boolean: true or false ▶ Byte: 8 bit signed value ▶ Short: 16 bit signed value ▶ Char: 16 bit unsigned Unicode character ▶ Int: 32 bit signed value ▶ Long: 64 bit signed value ▶ Float: 32 bit IEEE 754 single-precision float ▶ Double: 64 bit IEEE 754 double-precision float ▶ String: A sequence of characters var myInt: Int var myString: String 7 / 70 If... Else var x = 30; if (x == 10) { println("Value of X is 10"); } else if (x == 20) { println("Value of X is 20"); } else { println("This is else statement"); } 8 / 70 Loops (1/3) var a = 10 // do-while do { println(s"Value of a: $a") a = a + 1 } while(a < 20) // while loop execution while(a < 20) { println(s"Value of a: $a") a = a + 1 } 9 / 70 Loops (2/3) var a = 0 var b = 0 for (a Int, a: Int, b: Int): Int = if (a > b) 0 else f(a) + sum(f, a + 1, b) def id(x: Int): Int = x def square(x: Int): Int = x * x 23 / 70 Hands-on Exercises (2/2) ▶ Assume the following methods def sum(f: Int => Int, a: Int, b: Int): Int = if (a > b) 0 else f(a) + sum(f, a + 1, b) def id(x: Int): Int = x def square(x: Int): Int = x * x ▶ Reimplement the previous methods using higher-order functions. 23 / 70 Hands-on Exercises (2/2) ▶ Assume the following methods def sum(f: Int => Int, a: Int, b: Int): Int = if (a > b) 0 else f(a) + sum(f, a + 1, b) def id(x: Int): Int = x def square(x: Int): Int = x * x ▶ Reimplement the previous methods using higher-order functions. def sumInts(a: Int, b: Int): Int = sum(id, a, b) def sumSquares(a: Int, b: Int): Int = sum(square, a, b) 23 / 70 Outline ▶ Scala basics ▶ Functions ▶ Collections ▶ Classes and objects ▶ SBT 24 / 70 Collections ▶ Scala collections can be mutable and immutable collections. ▶ Mutable collections can be updated or extended in place. ▶ Immutable collections never change: additions, removals, or updates operators return a new collection and leave the old collection unchanged. 25 / 70 Collections ▶ Arrays ▶ Lists ▶ Sets ▶ Maps 26 / 70 Collections - Arrays ▶ A fixed-size sequential collection of elements of the same type ▶ Mutable // Array definition val t: Array[String] = new Array[String](3) val t = new Array[String](3) 27 / 70 Collections - Arrays ▶ A fixed-size sequential collection of elements of the same type ▶ Mutable // Array definition val t: Array[String] = new Array[String](3) val t = new Array[String](3) // Assign values or get access to individual elements t(0) = "zero"; t(1) = "one"; t(2) = "two" 27 / 70 Collections - Arrays ▶ A fixed-size sequential collection of elements of the same type ▶ Mutable // Array definition val t: Array[String] = new Array[String](3) val t = new Array[String](3) // Assign values or get access to individual elements t(0) = "zero"; t(1) = "one"; t(2) = "two" // There is one more way of defining an array val t = Array("zero", "one", "two") 27 / 70 Collections - Lists ▶ A sequential collection of elements of the same type ▶ Immutable ▶ Lists represent a linked list // List definition val l1 = List(1, 2, 3) val l1 = 1 :: 2 :: 3 :: Nil 28 / 70 Collections - Lists ▶ A sequential collection of elements of the same type ▶ Immutable ▶ Lists represent a linked list // List definition val l1 = List(1, 2, 3) val l1 = 1 :: 2 :: 3 :: Nil // Adding an element to the head of a list val l2 = 0 :: l1 28 / 70 Collections - Lists ▶ A sequential collection of elements of the same type ▶ Immutable ▶ Lists represent a linked list // List definition val l1 = List(1, 2, 3) val l1 = 1 :: 2 :: 3 :: Nil // Adding an element to the head of a list val l2 = 0 :: l1 // Adding an element to the tail of a list val l3 = l1 :+ 4 28 / 70 Collections - Lists ▶ A sequential collection of elements of the same type ▶ Immutable ▶ Lists represent a linked list // List definition val l1 = List(1, 2, 3) val l1 = 1 :: 2 :: 3 :: Nil // Adding an element to the head of a list val l2 = 0 :: l1 // Adding an element to the tail of a list val l3 = l1 :+ 4 // Concatenating lists val t3 = List(4, 5) val t4 = l1 ::: t3 28 / 70 Collections - Sets ▶ A sequential collection of elements of the same type ▶ Immutable and mutable ▶ No duplicates. // Set definition val s = Set(1, 2, 3) 29 / 70 Collections - Sets ▶ A sequential collection of elements of the same type ▶ Immutable and mutable ▶ No duplicates. // Set definition val s = Set(1, 2, 3) // Add a new element to the set val s2 = s + 0 29 / 70 Collections - Sets ▶ A sequential collection of elements of the same type ▶ Immutable and mutable ▶ No duplicates. // Set definition val s = Set(1, 2, 3) // Add a new element to the set val s2 = s + 0 // Remove an element from the set val s3 = s2 - 2 29 / 70 Collections - Sets ▶ A sequential collection of elements of the same type ▶ Immutable and mutable ▶ No duplicates. // Set definition val s = Set(1, 2, 3) // Add a new element to the set val s2 = s + 0 // Remove an element from the set val s3 = s2 - 2 // Test the membership s.contains(2) 29 / 70 Collections - Maps ▶ A collection of key/value pairs ▶ Immutable and mutable // Map definition var m1: Map[Char, Int] = Map() val m2 = Map(1 -> "Carbon", 2 -> "Hydrogen") 30 / 70 Collections - Maps ▶ A collection of key/value pairs ▶ Immutable and mutable // Map definition var m1: Map[Char, Int] = Map() val m2 = Map(1 -> "Carbon", 2 -> "Hydrogen") // Finding the element associated to a key in a map m2(1) 30 / 70 Collections - Maps ▶ A collection of key/value pairs ▶ Immutable and mutable // Map definition var m1: Map[Char, Int] = Map() val m2 = Map(1 -> "Carbon", 2 -> "Hydrogen") // Finding the element associated to a key in a map m2(1) // Adding an association in a map val m3 = m2 + (3 -> "Oxygen") 30 / 70 Collections - Maps ▶ A collection of key/value pairs ▶ Immutable and mutable // Map definition var m1: Map[Char, Int] = Map() val m2 = Map(1 -> "Carbon", 2 -> "Hydrogen") // Finding the element associated to a key in a map m2(1) // Adding an association in a map val m3 = m2 + (3 -> "Oxygen") // Returns an iterable containing each key (or values) in the map m2.keys m2.values 30 / 70 Colletion Methods ▶ map ▶ foreach ▶ filter ▶ zip ▶ partition ▶ find ▶ drop and dropWhile ▶ foldRight and foldLeft ▶ flatten ▶ flatMap 31 / 70 Functional Combinators - map ▶ Evaluates a function over each element in the list, returning a list with the same number of elements. val numbers = List(1, 2, 3, 4) // numbers: List[Int] = List(1, 2, 3, 4) numbers.map((i: Int) => i * 2) // res0: List[Int] = List(2, 4, 6, 8) 32 / 70 Functional Combinators - map ▶ Evaluates a function over each element in the list, returning a list with the same number of elements. val numbers = List(1, 2, 3, 4) // numbers: List[Int] = List(1, 2, 3, 4) numbers.map((i: Int) => i * 2) // res0: List[Int] = List(2, 4, 6, 8) def timesTwo(i: Int): Int = i * 2 // timesTwo: (i: Int)Int numbers.map(timesTwo _) // or numbers.map(timesTwo) // res1: List[Int] = List(2, 4, 6, 8) 32 / 70 Functional Combinators - foreach ▶ It is like map, but returns nothing. val numbers = List(1, 2, 3, 4) // numbers: List[Int] = List(1, 2, 3, 4) val doubled = numbers.foreach((i: Int) => i * 2) // doubled: Unit = () numbers.foreach(print) // 1234 33 / 70 Functional Combinators - filter ▶ Removes any elements where the function you pass in evaluates to false. val numbers = List(1, 2, 3, 4) // numbers: List[Int] = List(1, 2, 3, 4) numbers.filter((i: Int) => i % 2 == 0) // res0: List[Int] = List(2, 4) 34 / 70 Functional Combinators - filter ▶ Removes any elements where the function you pass in evaluates to false. val numbers = List(1, 2, 3, 4) // numbers: List[Int] = List(1, 2, 3, 4) numbers.filter((i: Int) => i % 2 == 0) // res0: List[Int] = List(2, 4) def isEven(i: Int): Boolean = i % 2 == 0 // isEven: (i: Int)Boolean numbers.filter(isEven) // res2: List[Int] = List(2, 4) 34 / 70 Functional Combinators - zip ▶ Aggregates the contents of two lists into a single list of pairs. val numbers = List(1, 2, 3, 4) // numbers: List[Int] = List(1, 2, 3, 4) val chars = List("a", "b", "c") // chars: List[String] = List(a, b, c) numbers.zip(chars) // res0: List[(Int, String)] = List((1, a), (2, b), (3, c)) 35 / 70 Functional Combinators - partition ▶ Splits a list based on where it falls with respect to a predicate function. val numbers = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // numbers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) numbers.partition(_ % 2 == 0) // res0: (List[Int], List[Int]) = (List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9)) 36 / 70 Functional Combinators - find ▶ Returns the first element of a collection that matches a predicate function. val numbers = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // numbers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) numbers.find(i => i > 5) // res0: Option[Int] = Some(6) 37 / 70 Functional Combinators - drop and dropWhile ▶ drop drops the first i elements. ▶ dropWhile removes the first elements that match a predicate function. val numbers = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // numbers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) numbers.drop(5) // res0: List[Int] = List(6, 7, 8, 9, 10) numbers.dropWhile(_ % 3 != 0) // res1: List[Int] = List(3, 4, 5, 6, 7, 8, 9, 10) 38 / 70 Functional Combinators - foldLeft ▶ Takes an associative binary operator function and uses it to collapse elements from the collection. ▶ It goes through the whole List, from head (left) to tail (right). val numbers = List(1, 2, 3, 4, 5) numbers.foldLeft(0) { (acc, i) => println("i: " + i + " acc: " + acc) i + acc } 39 / 70 Functional Combinators - foldRight ▶ It is the same as foldLeft except it runs in the opposite direction. val numbers = List(1, 2, 3, 4, 5) numbers.foldRight(0) { (i, acc) => println("i: " + i + " acc: " + acc) i + acc } 40 / 70 Functional Combinators - flatten ▶ It collapses one level of nested structure. List(List(1, 2), List(3, 4)).flatten // res0: List[Int] = List(1, 2, 3, 4) List(Some(1), None, Some(3)).flatten // res0: List[Int] = List(1, 3) 41 / 70 Functional Combinators - flatMap ▶ It takes a function that works on the nested lists and then concatenates the results back together. val nestedNumbers = List(List(1, 2), List(3, 4)) // nestedNumbers: List[List[Int]] = List(List(1, 2), List(3, 4)) nestedNumbers.flatMap(x => x.map(_ * 2)) // res0: List[Int] = List(2, 4, 6, 8) 42 / 70 43 / 70 Hands-on Exercises (1/3) ▶ Declare a list of integers as a variable called myNumbers. 44 / 70 Hands-on Exercises (1/3) ▶ Declare a list of integers as a variable called myNumbers. val myNumbers = List(1, 2, 5, 4, 7, 3) 44 / 70 Hands-on Exercises (1/3) ▶ Declare a list of integers as a variable called myNumbers. val myNumbers = List(1, 2, 5, 4, 7, 3) ▶ Declare a function, pow, that computes the second power of an int. 44 / 70 Hands-on Exercises (1/3) ▶ Declare a list of integers as a variable called myNumbers. val myNumbers = List(1, 2, 5, 4, 7, 3) ▶ Declare a function, pow, that computes the second power of an int. def pow(a: Int): Int = a * a 44 / 70 Hands-on Exercises (2/3) ▶ Apply the function to myNumbers using the map function. 45 / 70 Hands-on Exercises (2/3) ▶ Apply the function to myNumbers using the map function. myNumbers.map(x => pow(x)) // or myNumbers.map(pow(_)) // or myNumbers.map(pow) 45 / 70 Hands-on Exercises (2/3) ▶ Apply the function to myNumbers using the map function. myNumbers.map(x => pow(x)) // or myNumbers.map(pow(_)) // or myNumbers.map(pow) ▶ Write the pow function inline in a map call, using closure notation. 45 / 70 Hands-on Exercises (2/3) ▶ Apply the function to myNumbers using the map function. myNumbers.map(x => pow(x)) // or myNumbers.map(pow(_)) // or myNumbers.map(pow) ▶ Write the pow function inline in a map call, using closure notation. myNumbers.map(x => x * x) 45 / 70 Hands-on Exercises (2/3) ▶ Apply the function to myNumbers using the map function. myNumbers.map(x => pow(x)) // or myNumbers.map(pow(_)) // or myNumbers.map(pow) ▶ Write the pow function inline in a map call, using closure notation. myNumbers.map(x => x * x) ▶ Iterate through myNumbers and print out its items. 45 / 70 Hands-on Exercises (2/3) ▶ Apply the function to myNumbers using the map function. myNumbers.map(x => pow(x)) // or myNumbers.map(pow(_)) // or myNumbers.map(pow) ▶ Write the pow function inline in a map call, using closure notation. myNumbers.map(x => x * x) ▶ Iterate through myNumbers and print out its items. myNumbers.foreach(println) 45 / 70 Hands-on Exercises (3/3) ▶ Declare a list of pair of string and integers as a variable called myList. 46 / 70 Hands-on Exercises (3/3) ▶ Declare a list of pair of string and integers as a variable called myList. val myList = List[(String, Int)](("a", 1), ("b", 2), ("c", 3)) 46 / 70 Hands-on Exercises (3/3) ▶ Declare a list of pair of string and integers as a variable called myList. val myList = List[(String, Int)](("a", 1), ("b", 2), ("c", 3)) ▶ Write an inline function to increment the integer values of the list myList. 46 / 70 Hands-on Exercises (3/3) ▶ Declare a list of pair of string and integers as a variable called myList. val myList = List[(String, Int)](("a", 1), ("b", 2), ("c", 3)) ▶ Write an inline function to increment the integer values of the list myList. val x = v.map { case (name, age) => age + 1 } // or val x = v.map(i => i._2 + 1) // or val x = v.map(_._2 + 1) 46 / 70 Common Other Types ▶ Tuples ▶ Option 47 / 70 Common Data Types - Tuples ▶ A fixed number of items of different types together ▶ Immutable // Tuple definition val t2 = (1 -> "hello") // special pair constructor val t3 = (1, "hello", Console) val t3 = new Tuple3(1, "hello", 20) // Tuple getters t3._1 t3._2 t3._3 48 / 70 Common Data Types - Option (1/2) ▶ Sometimes you might or might not have a value. ▶ Java typically returns the value null to indicate nothing found. You may get a NullPointerException, if you don’t check it. ▶ Scala has a null value in order to communicate with Java. You should use it only for this purpose. ▶ Everyplace else, you should use Option. 49 / 70 Common Data Types - Option (2/2) val numbers = Map(1 -> "one", 2 -> "two") // numbers: scala.collection.immutable.Map[Int, String] = Map((1, one), (2, two)) 50 / 70 Common Data Types - Option (2/2) val numbers = Map(1 -> "one", 2 -> "two") // numbers: scala.collection.immutable.Map[Int, String] = Map((1, one), (2, two)) numbers.get(2) // res0: Option[String] = Some(two) 50 / 70 Common Data Types - Option (2/2) val numbers = Map(1 -> "one", 2 -> "two") // numbers: scala.collection.immutable.Map[Int, String] = Map((1, one), (2, two)) numbers.get(2) // res0: Option[String] = Some(two) numbers.get(3) // res1: Option[String] = None 50 / 70 Common Data Types - Option (2/2) val numbers = Map(1 -> "one", 2 -> "two") // numbers: scala.collection.immutable.Map[Int, String] = Map((1, one), (2, two)) numbers.get(2) // res0: Option[String] = Some(two) numbers.get(3) // res1: Option[String] = None // Check if an Option value is defined (isDefined and isEmpty). val result = numbers.get(3).isDefined // result: Boolean = false 50 / 70 Common Data Types - Option (2/2) val numbers = Map(1 -> "one", 2 -> "two") // numbers: scala.collection.immutable.Map[Int, String] = Map((1, one), (2, two)) numbers.get(2) // res0: Option[String] = Some(two) numbers.get(3) // res1: Option[String] = None // Check if an Option value is defined (isDefined and isEmpty). val result = numbers.get(3).isDefined // result: Boolean = false // Extract the value of an Option. val result = numbers.get(3).getOrElse("zero") // result: String = zero 50 / 70 Outline ▶ Scala basics ▶ Functions ▶ Collections ▶ Classes and objects ▶ SBT 51 / 70 Everything is an Object ▶ Scala is a pure object-oriented language. ▶ Everything is an object, including numbers. 1 + 2 * 3 / x (1).+(((2).*(3))./(x)) ▶ Functions are also objects, so it is possible to pass functions as arguments, to store them in variables, and to return them from other functions. 52 / 70 Classes and Objects // constructor parameters can be declared as fields and can have default values class Calculator(val brand = "HP") { // an instance method def add(m: Int, n: Int): Int = m + n } val calc = new Calculator calc.add(1, 2) println(calc.brand) // HP 53 / 70 Inheritance and Overloading Methods ▶ Scala allows the inheritance from just one class only. class SciCalculator(_brand: String) extends Calculator(_brand) { def log(m: Double, base: Double) = math.log(m) / math.log(base) } class MoreSciCalculator(_brand: String) extends SciCalculator(_brand) { def log(m: Int): Double = log(m, math.exp(1)) } 54 / 70 Singleton Objects ▶ A singleton is a class that can have only one instance. class Point(val x: Int, val y: Int) { def printPoint { println(s"Point x location: $x"); println(s"Point y location: $y"); } } object SpecialPoint extends Point(10, 20) SpecialPoint.printPoint 55 / 70 Abstract Classes abstract class Shape { // subclass should define this def getArea(): Int } class Circle(r: Int) extends Shape { override def getArea(): Int = { r * r * 3 } } val s = new Shape // error: class Shape is abstract val c = new Circle(2) c.getArea // 12 56 / 70 Traits ▶ A class can mix in any number of traits. trait Car { val brand: String } trait Shiny { val shineRefraction: Int } class BMW extends Car with Shiny { val brand = "BMW" val shineRefraction = 12 } 57 / 70 Case Classes and Pattern Matching ▶ Case classes are used to store and match on the contents of a class. ▶ They are designed to be used with pattern matching. ▶ You can construct them without using new. case class Calculator(brand: String, model: String) val hp20b = Calculator("hp", "20B") def calcType(calc: Calculator) = calc match { case Calculator("hp", "20B") => "financial" case Calculator("hp", "48G") => "scientific" case Calculator("hp", "30B") => "business" case _ => "Calculator of unknown type" } calcType(hp20b) 58 / 70 Outline ▶ Scala basics ▶ Functions ▶ Collections ▶ Classes and objects ▶ SBT 59 / 70 Simple Build Tool (SBT) ▶ An open source build tool for Scala and Java projects. ▶ Similar to Java’s Maven or Ant. ▶ It is written in Scala. 60 / 70 SBT - Hello World! $ mkdir hello $ cd hello $ cp /HelloWorld.scala. $ sbt... > run 61 / 70 Running SBT ▶ Interactive mode $ sbt > compile > run ▶ Batch mode $ sbt clean run ▶ Continuous build and test: automatically recompile or run tests whenever you save a source file. $ sbt > ~ compile 62 / 70 Common Commands ▶ clean: deletes all generated files (in target). ▶ compile: compiles the main sources (in src/main/scala). ▶ test: compiles and runs all tests. ▶ console: starts the Scala interpreter. ▶ run *: run the main class. ▶ package: creates a jar file containing the files in src/main/resources and the classes compiled from src/main/scala. ▶ help : displays detailed help for the specified command. ▶ reload: reloads the build definition (build.sbt, project/*.scala, project/*.sbt files). 63 / 70 Create a Simple Project ▶ Create project directory. ▶ Create src/main/scala directory. ▶ Create build.sbt in the project root. 64 / 70 build.sbt ▶ A list of Scala expressions, separated by blank lines. ▶ Located in the project’s base directory. $ cat build.sbt name := "hello" version := "1.0" scalaVersion := "2.12.8" 65 / 70 Add Dependencies ▶ Add in build.sbt. ▶ Module ID format: "groupID" %% "artifact" % "version" % "configuration" libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.0" // multiple dependencies libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "3.3.0", "org.apache.spark" % "spark-streaming_2.12" % "3.3.0", "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.3.0", "org.apache.spark" % "spark-streaming-kafka-0-10_2.12" % "3.3.0" ) 66 / 70 Summary 67 / 70 Summary ▶ Scala basics ▶ Functions ▶ Collections ▶ Classes and objects ▶ SBT 68 / 70 References ▶ M. Odersky, Scala by example, 2011. 69 / 70 Questions? 70 / 70 Parallel Processing - MapReduce Amir H. Payberah [email protected] 2024-09-10 Where Are We? 1 / 79 What do we do when there is too much data to process? 2 / 79 Scale Up vs. Scale Out ▶ Scale up or scale vertically: adding resources to a single node in a system. ▶ Scale out or scale horizontally: adding more nodes to a system. 3 / 79 Taxonomy of Parallel Architectures DeWitt, D. and Gray, J. “Parallel database systems: the future of high performance database systems”. ACM Communications, 35(6), 85-98, 1992. 4 / 79 MapReduce ▶ A shared nothing architecture for processing large data sets with a parallel/distributed algorithm on clusters of commodity hardware. 5 / 79 Challenges ▶ How to distribute computation? ▶ How can we make it easy to write distributed programs? ▶ Machines failure. 6 / 79 Simplicity ▶ MapReduce takes care of parallelization, fault tolerance, and data distribution. ▶ Hide system-level details from programmers. [http://www.johnlund.com/page/8358/elephant-on-a-scooter.asp] 7 / 79 MapReduce Definition ▶ A programming model: to batch process large data sets (inspired by functional pro- gramming). ▶ An execution framework: to run parallel algorithms on clusters of commodity hard- ware. 8 / 79 Programming Model 9 / 79 10 / 79 11 / 79 12 / 79 13 / 79 14 / 79 15 / 79 Word Count ▶ Count the number of times each distinct word appears in the file ▶ If the file fits in memory: words(doc.txt) | sort | uniq -c ▶ If not? 16 / 79 Data-Parallel Processing (1/2) ▶ Parallelize the data and process. 17 / 79 Data-Parallel Processing (2/2) ▶ MapReduce 18 / 79 MapReduce Stages - Map ▶ Each Map task (typically) operates on a single HDFS block. ▶ Map tasks (usually) run on the node where the block is stored. ▶ Each Map task generates a set of intermediate key/value pairs. 19 / 79 MapReduce Stages - Shuffle and Sort ▶ Sorts and consolidates intermediate data from all mappers. ▶ Happens after all Map tasks are complete and before Reduce tasks start. 20 / 79 MapReduce Stages - Reduce ▶ Each Reduce task operates on all intermediate values associated with the same in- termediate key. ▶ Produces the final output. 21 / 79 MapReduce Data Flow (1/5) 22 / 79 MapReduce Data Flow (2/5) 23 / 79 MapReduce Data Flow (3/5) 24 / 79 MapReduce Data Flow (4/5) 25 / 79 MapReduce Data Flow (5/5) 26 / 79 Word Count in MapReduce ▶ Consider doing a word count of the following file using MapReduce 27 / 79 Word Count in MapReduce - Map (1/2) 28 / 79 Word Count in MapReduce - Map (2/2) 29 / 79 Word Count in MapReduce - Shuffle and Sort (1/3) 30 / 79 Word Count in MapReduce - Shuffle and Sort (2/3) 31 / 79 Word Count in MapReduce - Shuffle and Sort (3/3) 32 / 79 Word Count in MapReduce - Reduce (1/2) 33 / 79 Word Count in MapReduce - Reduce (2/2) 34 / 79 Mapper 35 / 79 The Mapper ▶ Input: (key, value) pairs ▶ Output: a list of (key, value) pairs ▶ The Mapper may use or completely ignore the input key. ▶ A standard pattern is to read one line of a file at a time. Key: the byte offset Value: the content of the line map(in_key, in_value) -> list of (inter_key, inter_value) (in key, in value) ⇒ map() ⇒ (inter key1, inter value1) (inter key2, inter value2) (inter key3, inter value3) (inter key4, inter value4) ··· 36 / 79 The Mapper Example (1/3) ▶ Turn input into upper case map(k, v) = emit (k.to_upper, v.to_upper) (kth, this is the course id2221) ⇒ map() ⇒ (KTH, THIS IS THE COURSE ID2221) 37 / 79 The Mapper Example (2/3) ▶ Count the number of characters in the input map(k, v) = emit (k, v.length) (kth, this is the course id2221) ⇒ map() ⇒ (kth, 26) 38 / 79 The Mapper Example (3/3) ▶ Turn each word in the input into pair of (word, 1) map(k, v) = foreach w in v emit (w, 1) (21, Hello Hadoop Goodbye Hadoop) ⇒ map() ⇒ (Hello, 1) (Hadoop, 1) (Goodbye, 1) (Hadoop, 1) 39 / 79 Reducer 40 / 79 Shuffle and Sort ▶ After the Map phase, all intermediate (key, value) pairs are grouped by the interme- diate keys. ▶ Each (key, list of values) is passed to a Reducer. 41 / 79 The Reducer ▶ Input: (key, list of values) pairs ▶ Output: a (key, value) pair or list of (key, value) pairs ▶ The Reducer outputs zero or more final (key, value) pairs reduce(inter_key, [inter_value1, inter_value2,...]) -> (out_key, out_value) (inter k, [inter v1, inter v2, · · · ]) ⇒ reduce() ⇒ (out k, out v) or (inter k, [inter v1, inter v2, · · · ]) ⇒ reduce() ⇒ (out k, out v1) (out k, out v2) ··· 42 / 79 The Reducer Example (1/3) ▶ Add up all the values associated with each intermediate key reduce(k, vals) = { sum = 0 foreach v in vals sum += v emit (k, sum) } (Hello, [1, 1, 1]) ⇒ reduce() ⇒ (Hello, 3) (Bye, ) ⇒ reduce() ⇒ (Bye, 1) 43 / 79 The Reducer Example (2/3) ▶ Get the maximum value of each intermediate key reduce(k, vals) = emit (k, max(vals)) (KTH, [5, 1, 12, 7]) ⇒ reduce() ⇒ (KTH, 12) 44 / 79 The Reducer Example (3/3) ▶ Identify reducer reduce(k, vals) = foreach v in vals emit (k, v)) (KTH, [5, 1, 12, 7]) ⇒ reduce() ⇒ (KTH, 5) (KTH, 1) (KTH, 12) (KTH, 7) 45 / 79 Example: Word Count - map public static class MyMap extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } 46 / 79 Example: Word Count - reduce public static class MyReduce extends Reducer { public void reduce(Text key, Iterator values, Context context) throws IOException, InterruptedException { int sum = 0; while (values.hasNext()) sum += values.next().get(); context.write(key, new IntWritable(sum)); } } 47 / 79 Example: Word Count - driver public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(MyMap.class); job.setCombinerClass(MyReduce.class); job.setReducerClass(MyReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args)); FileOutputFormat.setOutputPath(job, new Path(args)); job.waitForCompletion(true); } 48 / 79 MapReduce Algorithm Design 49 / 79 MapReduce Algorithm Design ▶ Local aggregation ▶ Joining ▶ Sorting 50 / 79 MapReduce Algorithm Design ▶ Local aggregation ▶ Joining ▶ Sorting 51 / 79 Local Aggregation - In-Map Combiner (1/2) ▶ In some cases, there is significant repetition in the intermediate keys produced by each map task, and the reduce function is commutative and associative. 52 / 79 Local Aggregation - In-Map Combiner (2/2) ▶ Merge partially data before it is sent over the network to the reducer. ▶ Typically the same code for the combiner and the reduce function. 53 / 79 MapReduce Algorithm Design ▶ Local aggregation ▶ Joining ▶ Sorting 54 / 79 Joins ▶ Joins are relational constructs you use to combine relations together. ▶ In MapReduce joins are applicable in situations where you have two or more datasets you want to combine. 55 / 79 Joins - Two Strategies ▶ Reduce-side join Repartition join When joining two or more large datasets together ▶ Map-side join Replication join When one of the datasets is small enough to cache 56 / 79 Joins - Reduce-Side Join [M. Donald et al., MapReduce design patterns, O’Reilly, 2012.] 57 / 79 Joins - Map-Side Join [M. Donald et al., MapReduce design patterns, O’Reilly, 2012.] 58 / 79 MapReduce Algorithm Design ▶ Local aggregation ▶ Joining ▶ Sorting 59 / 79 Sort (1/3) ▶ Assume you want to have your job output in total sort order. ▶ Trivial with a single Reducer. Keys are passed to the Reducer in sorted order. 60 / 79 Sort (2/3) ▶ What if we have multiple Reducer? 61 / 79 Sort (3/3) ▶ For multiple Reducers we need to choose a partitioning function key1 < key2 ⇒ partition(key1) ≤ partition(key2) 62 / 79 63 / 79 Implementation 64 / 79 Architecture 65 / 79 MapReduce Execution (1/7) ▶ The user program divides the input files into M splits. A typical size of a split is the size of a HDFS block (64 MB). Converts them to key/value pairs. ▶ It starts up many copies of the program on a cluster of machines. 66 / 79 MapReduce Execution (2/7) ▶ One of the copies of the program is master, and the rest are workers. ▶ The master assigns works to the workers. It picks idle workers and assigns each one a map task or a reduce task. 67 / 79 MapReduce Execution (3/7) ▶ A map worker reads the contents of the corresponding input splits. ▶ It parses key/value pairs out of the input data and passes each pair to the user defined map function. ▶ The key/value pairs produced by the map function are buffered in memory. 68 / 79 MapReduce Execution (4/7) ▶ The buffered pairs are periodically written to local disk. They are partitioned into R regions (hash(key) mod R). ▶ The locations of the buffered pairs on the local disk are passed back to the master. ▶ The master forwards these locations to the reduce workers. 69 / 79 MapReduce Execution (5/7) ▶ A reduce worker reads the buffered data from the local disks of the map workers. ▶ When a reduce worker has read all intermediate data, it sorts it by the intermediate keys. 70 / 79 MapReduce Execution (6/7) ▶ The reduce worker iterates over the intermediate data. ▶ For each unique intermediate key, it passes the key and the corresponding set of intermediate values to the user defined reduce function. ▶ The output of the reduce function is appended to a final output file for this reduce partition. 71 / 79 MapReduce Execution (7/7) ▶ When all map tasks and reduce tasks have been completed, the master wakes up the user program. 72 / 79 Hadoop MapReduce and HDFS 73 / 79 Fault Tolerance - Worker ▶ Detect failure via periodic heartbeats. ▶ Re-execute in-progress map and reduce tasks. ▶ Re-execute completed map tasks: their output is stored on the local disk of the failed machine and is therefore inaccessible. ▶ Completed reduce tasks do not need to be re-executed since their output is stored in a global filesystem. 74 / 79 Fault Tolerance - Master ▶ State is periodically checkpointed: a new copy of master starts from the last checkpoint state. 75 / 79 Summary 76 / 79 Summary ▶ Scaling out: shared nothing architecture ▶ MapReduce Programming model: Map and Reduce Execution framework 77 / 79 References ▶ J. Dean et al., ”MapReduce: simplified data processing on large clusters”, Commu- nications of the ACM, 2008. ▶ J. Lin et al., ”Data-intensive text processing with MapReduce”, Synthesis Lectures on Human Language Technologies, 2010. 78 / 79 Questions? 79 / 79 Parallel Processing - Spark Amir H. Payberah [email protected] 2024-10-11 Where Are We? 1 / 57 MapReduce Reminder 2 / 57 Motivation (1/2) ▶ Acyclic data flow from stable storage to stable storage. 3 / 57 Motivation (2/2) ▶ MapReduce is expensive (slow), i.e., always goes to disk and HDFS. 4 / 57 So, Let’s Use Spark 5 / 57 Spark vs. MapReduce (1/2) 6 / 57 Spark vs. MapReduce (2/2) 7 / 57 Spark Application 8 / 57 Spark Applications Architecture ▶ Spark applications consist of A driver process A set of executor processes [M. Zaharia et al., Spark: The Definitive Guide, O’Reilly Media, 2018] 9 / 57 Driver Process ▶ The heart of a Spark application ▶ Runs the main() function ▶ Responsible for three things: Maintaining information about the Spark application Responding to a user’s program or input Analyzing, distributing, and scheduling work across the executors 10 / 57 Executors ▶ Executing code assigned to it by the driver ▶ Reporting the state of the computation on that executor back to the driver 11 / 57 SparkSession ▶ A driver process that controls a Spark application. ▶ A one-to-one correspondence between a SparkSession and a Spark application. ▶ Available in console shell as spark. SparkSession.builder.master(master).appName(appName).getOrCreate() 12 / 57 SparkContext ▶ The entry point for low-level API functionality. ▶ You access it through the SparkSession. ▶ Available in console shell as sc. val conf = new SparkConf().setMaster(master).setAppName(appName) new SparkContext(conf) 13 / 57 SparkSession vs. SparkContext ▶ Prior to Spark 2.0.0, a the spark driver program uses SparkContext to connect to the cluster. ▶ In order to use APIs of SQL, Hive and streaming, separate SparkContexts should to be created. ▶ SparkSession provides access to all the spark functionalities that SparkContext does, e.g., SQL, Hive and streaming. ▶ SparkSession internally has a SparkContext for actual computation. 14 / 57 Programming Model 15 / 57 Spark Programming Model ▶ Job is described based on directed acyclic graphs (DAG) data flow. ▶ A data flow is composed of any number of data sources, operators, and data sinks by connecting their inputs and outputs. ▶ Parallelizable operators 16 / 57 Resilient Distributed Datasets (RDD) (1/3) ▶ A distributed memory abstraction. ▶ Immutable collections of objects spread across a cluster. Like a LinkedList 17 / 57 Resilient Distributed Datasets (RDD) (2/3) ▶ An RDD is divided into a number of partitions, which are atomic pieces of information. ▶ Partitions of an RDD can be stored on different nodes of a cluster. 18 / 57 Resilient Distributed Datasets (RDD) (3/3) ▶ RDDs were the primary API in the Spark 1.x series. ▶ They are not commonly used in the Spark 2.x series. ▶ Virtually all Spark code you run, compiles down to an RDD. 19 / 57 Types of RDDs ▶ Two types of RDDs: Generic RDD Key-value RDD ▶ Both represent a collection of objects. ▶ Key-value RDDs have special operations, such as aggregation, and a concept of custom partitioning by key. 20 / 57 Creating RDDs 21 / 57 Creating RDDs - Parallelized Collections ▶ Use the parallelize method on a SparkContext. ▶ This turns a single node collection into a parallel collection. ▶ You can also explicitly state the number of partitions. ▶ In the console shell, you can either use sc or spark.sparkContext val numsCollection = Array(1, 2, 3) val nums = sc.parallelize(numsCollection) val wordsCollection = "take it easy, this is a test".split(" ") val words = spark.sparkContext.parallelize(wordsCollection, 2) 22 / 57 Creating RDDs - External Datasets ▶ Create RDD from an external storage. E.g., local file system, HDFS, Cassandra, HBase, Amazon S3, etc. ▶ Text file RDDs can be created using textFile method. val myFile1 = sc.textFile("file.txt") val myFile2 = sc.textFile("hdfs://namenode:9000/path/file") 23 / 57 RDD Operations 24 / 57 RDD Operations ▶ RDDs support two types of operations: Transformations: allow us to build the logical plan Actions: allow us to trigger the computation 25 / 57 Transformations 26 / 57 Transformations ▶ Create a new RDD from an existing one. ▶ All transformations are lazy. Not compute their results right away. Remember the transformations applied to the base dataset. They are only computed when an action requires a result to be returned to the driver program. 27 / 57 Generic RDD Transformations ▶ map applies a given function on each RDD record independently. val nums = sc.parallelize(Array(1, 2, 3)) val squares = nums.map(x => x * x) // 1, 4, 9 28 / 57 Lineage ▶ Lineage: transformations used to build an RDD. ▶ RDDs are stored as a chain of objects cap- turing the lineage of each RDD. val file = sc.textFile("hdfs://...") val sics = file.filter(_.contains("SICS")) val cachedSics = sics.cache() val ones = cachedSics.map(_ => 1) val count = ones.reduce(_+_) 29 / 57 Key-Value RDD Transformations ▶ In a (k, v) pairs, k is is the key, and v is the value. ▶ To make a key-value RDD: map over your current RDD to a basic key-value structure. val words = sc.parallelize("take it easy, this is a test".split(" ")) val keyword1 = words.map(word => (word, 1)) // (take,1), (it,1), (easy,,1), (this,1), (is,1), (a,1), (test,1) 30 / 57 Key-Value RDD Transformations - Aggregation ▶ Aggregate the values associated with each key. def addFunc(a:Int, b:Int) = a + b val kvChars =... // (t,1), (a,1), (k,1), (e,1), (i,1), (t,1), (e,1), (a,1), (s,1), (y,1), (,,1),... val grpChar = kvChars.groupByKey().map(row => (row._1, row._2.reduce(addFunc))) // (t,5), (h,1), (,,1), (e,3), (a,3), (i,3), (y,1), (s,4), (k,1)) val redChar = kvChars.reduceByKey(addFunc) // (t,5), (h,1), (,,1), (e,3), (a,3), (i,3), (y,1), (s,4), (k,1)) 31 / 57 Key-Value RDD Transformations - Join ▶ join performs an inner-join on the key. ▶ fullOtherJoin, leftOuterJoin, rightOuterJoin, and cartesian. val keyedChars =... // (t,4), (h,6), (,,9), (e,8), (a,3), (i,5), (y,2), (s,7), (k,0) val kvChars =... // (t,1), (a,1), (k,1), (e,1), (i,1), (t,1), (e,1), (a,1), (s,1), (y,1), (,,1),... val joinedChars = kvChars.join(keyedChars) // (t,(1,4)), (t,(1,4)), (t,(1,4)), (t,(1,4)), (t,(1,4)), (h,(1,6)), (,,(1,9)), (e,(1,8)),... 32 / 57 Actions 33 / 57 Actions ▶ Transformations allow us to build up our logical transformation plan (lineage graph). ▶ We run an action to trigger the computation. Instructs Spark to compute a result from a series of transformations. 34 / 57 RDD Actions (1/3) ▶ collect returns all the elements of the RDD as an array at the driver. val nums = sc.parallelize(Array(1, 2, 3)) nums.collect() // Array(1, 2, 3) 35 / 57 RDD Actions (2/3) ▶ reduce aggregates the elements of the dataset using a given function. ▶ The given function should be commutative and associative so that it can be computed correctly in parallel. sc.parallelize(1 to 20).reduce(_ + _) // 210 36 / 57 RDD Actions (3/3) ▶ saveAsTextFile writes the elements of an RDD as a text file. Local filesystem, HDFS or any other Hadoop-supported file system. ▶ saveAsObjectFile explicitly writes key-value pairs. val words = sc.parallelize("take it easy, this is a test".split(" ")) words.saveAsTextFile("file:/tmp/words") 37 / 57 Spark Word-Count val textFile = sc.textFile("hdfs://...") val words = textFile.flatMap(line => line.split(" ")) val ones = words.map(word => (word, 1)) val counts = ones.reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...") 38 / 57 Cache and Checkpoints 39 / 57 Caching ▶ When you cache an RDD, each node stores any partitions of it that it computes in memory. ▶ An RDD that is not cached is re-evaluated each time an action is invoked on that RDD. ▶ A node reuses the cached RDD in other actions on that dataset. ▶ There are two functions for caching an RDD: cache caches the RDD into memory persist(level) can cache in memory, on disk, or off-heap memory val words = sc.parallelize("take it easy, this is a test".split(" ")) words.cache() 40 / 57 Checkpointing ▶ checkpoint saves an RDD to disk. ▶ Checkpointed data is not removed after SparkContext is destroyed. ▶ When we reference a checkpointed RDD, it will derive from the checkpoint instead of the source data. val words = sc.parallelize("take it easy, this is a test".split(" ")) sc.setCheckpointDir("/path/checkpointing") words.checkpoint() 41 / 57 Execution Engine 42 / 57 More About Lineage ▶ A DAG representing the computations done on the RDD is called lineage graph. val rdd = sc.textFile(...) val filtered = rdd.map(...).filter(...).persist() val count = filtered.count() val reduced = filtered.reduce() [https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies] 43 / 57 Dependencies ▶ RDD dependencies encode when data must move across network. [https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies] 44 / 57 Two Types of Dependencies (1/2) ▶ Narrow transformations (dependencies) Each input partition will contribute to only one output partition. With narrow transformations, Spark can perform a pipelining [https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies] 45 / 57 Two Types of Dependencies (2/2) ▶ Wide transformations (dependencies) Each input partition will contribute to many output partition. Usually referred to as a shuffle [https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies] 46 / 57 Example [https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies] 47 / 57 The Anatomy of a Spark Job [H. Karau et al., High Performance Spark, O’Reilly Media, 2017] 48 / 57 Jobs ▶ A Spark job is the highest element of Spark’s execution hierarchy. Each Spark job corresponds to one action. Each action is called by the driver program of a Spark application. [H. Karau et al., High Performance Spark, O’Reilly Media, 2017] 49 / 57 Stages ▶ Each job breaks down into a series of stages. Stages in Spark represent groups of tasks that can be executed together. Wide transformations define the breakdown of jobs into stages. [H. Karau et al., High Performance Spark, O’Reilly Media, 2017] 50 / 57 Tasks ▶ A stage consists of tasks, which are the smallest execution unit. Each task represents one local computation. All of the tasks in one stage execute the same code on a different piece of the data. [H. Karau et al., High Performance Spark, O’Reilly Media, 2017] 51 / 57 Lineages and Fault Tolerance (1/2) ▶ No replication. ▶ Lineages are the key to fault tolerance in Spark. ▶ Recompute only the lost partitions of an RDD. 52 / 57 Lineages and Fault Tolerance (2/2) ▶ Assume one of the partitions fails. ▶ We only have to recompute the data shown below to get back on track. [https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies] 53 / 57 Summary 54 / 57 Summary ▶ RDD: a distributed memory abstraction ▶ Two types of operations: transformations and actions ▶ Lineage graph ▶ Caching ▶ Jobs, stages, and tasks ▶ Wide vs. narrow dependencies 55 / 57 References ▶ M. Zaharia et al., “Spark: The Definitive Guide”, O’Reilly Media, 2018 - Chapters 2, 12, 13, and 14 ▶ M. Zaharia et al., “Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing”, USENIX NSDI, 2012. 56 / 57 Questions? 57 / 57 Structured Data Processing - Spark SQL Amir H. Payberah [email protected] 2024-09-16 Where Are We? 1 / 58 Motivation 2 / 58 Spark and Spark SQL 3 / 58 Structured Data vs. RDD (1/2) ▶ case class Account(name: String, balance: Double, risk: Boolean) 4 / 58 Structured Data vs. RDD (1/2) ▶ case class Account(name: String, balance: Double, risk: Boolean) ▶ RDD[Account] 4 / 58 Structured Data vs. RDD (1/2) ▶ case class Account(name: String, balance: Double, risk: Boolean) ▶ RDD[Account] ▶ RDDs don’t know anything about the schema of the data it’s dealing with. 4 / 58 Structured Data vs. RDD (2/2) ▶ case class Account(name: String, balance: Double, risk: Boolean) ▶ RDD[Account] ▶ A database/Hive sees it as a columns of named and typed values. 5 / 58 DataFrames and DataSets ▶ Spark has two notions of structured collections: DataFrames Datasets ▶ They are distributed table-like collections with well-defined rows and columns. 6 / 58 DataFrames and DataSets ▶ Spark has two notions of structured collections: DataFrames Datasets ▶ They are distributed table-like collections with well-defined rows and columns. ▶ They represent immutable lazily evaluated plans. ▶ When an action is performed on them, Spark performs the actual transformations and return the result. 6 / 58 DataFrame 7 / 58 DataFrame ▶ Consists of a series of rows and a number of columns. ▶ Equivalent to a table in a relational database. ▶ Spark + RDD: functional transformations on partitioned collections of objects. ▶ SQL + DataFrame: declarative transformations on partitioned collections of tuples. 8 / 58 Schema ▶ Defines the column names and types of a DataFrame. ▶ Assume people.json file as an input: {"name":"Michael", "age":15, "id":12} {"name":"Andy", "age":30, "id":15} {"name":"Justin", "age":19, "id":20} {"name":"Andy", "age":12, "id":15} {"name":"Jim", "age":19, "id":20} {"name":"Andy", "age":12, "id":10} 9 / 58 Schema ▶ Defines the column names and types of a DataFrame. ▶ Assume people.json file as an input: {"name":"Michael", "age":15, "id":12} {"name":"Andy", "age":30, "id":15} {"name":"Justin", "age":19, "id":20} {"name":"Andy", "age":12, "id":15} {"name":"Jim", "age":19, "id":20} {"name":"Andy", "age":12, "id":10} val people = spark.read.format("json").load("people.json") people.schema // returns: StructType(StructField(age,LongType,true), StructField(id,LongType,true), StructField(name,StringType,true)) 9 / 58 Column ▶ They are like columns in a table. ▶ col returns a reference to a column. ▶ columns returns all columns on a DataFrame val people = spark.read.format("json").load("people.json") col("age") people.columns // returns: Array[String] = Array(age, id, name) 10 / 58 Row ▶ A row is a record of data. ▶ They are of type Row. import org.apache.spark.sql.Row val myRow = Row("Seif", 65, 0) 11 / 58 Row ▶ A row is a record of data. ▶ They are of type Row. ▶ Rows do not have schemas. import org.apache.spark.sql.Row val myRow = Row("Seif", 65, 0) 11 / 58 Row ▶ A row is a record of data. ▶ They are of type Row. ▶ Rows do not have schemas. ▶ To access data in rows, you need to specify the position that you would like. import org.apache.spark.sql.Row val myRow = Row("Seif", 65, 0) myRow(0) // type Any myRow(0).asInstanceOf[String] // String myRow.getString(0) // String myRow.getInt(1) // Int 11 / 58 Creating a DataFrame ▶ Two ways to create a DataFrame: 1. From an RDD 2. From raw data sources 12 / 58 Creating a DataFrame - From an RDD ▶ You can use toDF to convert an RDD to DataFrame. ▶ The schema automatically inferred. val tupleRDD = sc.parallelize(Array(("seif", 65, 0), ("amir", 40, 1))) val tupleDF = tupleRDD.toDF("name", "age", "id") 13 / 58 Creating a DataFrame - From an RDD ▶ You can use toDF to convert an RDD to DataFrame. ▶ The schema automatically inferred. val tupleRDD = sc.parallelize(Array(("seif", 65, 0), ("amir", 40, 1))) val tupleDF = tupleRDD.toDF("name", "age", "id") 13 / 58 Creating a DataFrame - From an RDD ▶ You can use toDF to convert an RDD to DataFrame. ▶ The schema automatically inferred. val tupleRDD = sc.parallelize(Array(("seif", 65, 0), ("amir", 40, 1))) val tupleDF = tupleRDD.toDF("name", "age", "id") ▶ If RDD contains case class instances, Spark infers the attributes from it. case class Person(name: String, age: Int, id: Int) val peopleRDD = sc.parallelize(Array(Person("seif", 65, 0), Person("amir", 40, 1))) val peopleDF = peopleRDD.toDF 13 / 58 Creating a DataFrame - From Data Source ▶ Data sources supported by Spark. val peopleJson = spark.read.format("json").load("people.json") val peopleCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("people.csv") 14 / 58 Creating a DataFrame - From Data Source ▶ Data sources supported by Spark. CSV, JSON, Parquet, ORC, JDBC/ODBC connections, Plain-text files val peopleJson = spark.read.format("json").load("people.json") val peopleCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("people.csv") 14 / 58 Creating a DataFrame - From Data Source ▶ Data sources supported by Spark. CSV, JSON, Parquet, ORC, JDBC/ODBC connections, Plain-text files Cassandra, HBase, MongoDB, AWS Redshift, XML, etc. val peopleJson = spark.read.format("json").load("people.json") val peopleCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("people.csv") 14 / 58 DataFrame Transformations (1/5) ▶ Add and remove rows or columns ▶ Transform a row into a column (or vice versa) ▶ Change the order of rows based on the values in columns [M. Zaharia et al., Spark: The Definitive Guide, O’Reilly Media, 2018] 15 / 58 DataFrame Transformations (2/5) ▶ select and selectExpr allow to do the DataFrame equivalent of SQL queries on a table of data. // select people.select("name", "age", "id").show(2) 16 / 58 DataFrame Transformations (2/5) ▶ select and selectExpr allow to do the DataFrame equivalent of SQL queries on a table of data. // select people.select("name", "age", "id").show(2) // selectExpr people.selectExpr("*", "(age < 20) as teenager").show() people.selectExpr("avg(age)", "count(distinct(name))", "sum(id)").show() 16 / 58 DataFrame Transformations (3/5) ▶ filter and where both filter rows. ▶ distinct can be used to extract unique rows. people.filter("age < 20").show() people.where("age < 20").show() people.select("name").distinct().show() 17 / 58 What is the output? people.selectExpr("avg(age)", "count(distinct(name)) as distinct").show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Andy| +---+---+-------+ 18 / 58 What is the output? people.selectExpr("avg(age)", "count(distinct(name)) as distinct").show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Andy| +---+---+-------+ +--------+--------+ |avg(age)|distinct| +--------+--------+ | 21.333| 2| +--------+--------+ 18 / 58 DataFrame Transformations (4/5) ▶ withColumn adds a new column to a DataFrame. people.withColumn("teenager", expr("age < 20")).show() 19 / 58 DataFrame Transformations (4/5) ▶ withColumn adds a new column to a DataFrame. ▶ withColumnRenamed renames a column. people.withColumn("teenager", expr("age < 20")).show() people.withColumnRenamed("name", "username").columns 19 / 58 DataFrame Transformations (4/5) ▶ withColumn adds a new column to a DataFrame. ▶ withColumnRenamed renames a column. ▶ drop removes a column. people.withColumn("teenager", expr("age < 20")).show() people.withColumnRenamed("name", "username").columns people.drop("name").columns 19 / 58 What is the output? people.withColumn("teenager", expr("age < 20")).show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Justin| +---+---+-------+ 20 / 58 What is the output? people.withColumn("teenager", expr("age < 20")).show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Justin| +---+---+-------+ Option 1 Option 2 +---+---+-------+--------+ +---+---+-------+--------+ |age| id| name|teenager| |age| id| name|teenager| +---+---+-------+--------+ +---+---+-------+--------+ | 15| 12|Michael| true| | 15| 12|Michael| true| | 30| 15| Andy| false| | 19| 20| Justin| true| | 19| 20| Justin| true| +---+---+-------+--------+ +---+---+-------+--------+ 20 / 58 DataFrame Transformations (5/5) ▶ You can use udf to define new column-based functions. import org.apache.spark.sql.functions.{col, udf} val df = spark.createDataFrame(Seq((0, "hello"), (1, "world"))).toDF("id", "text") val upper: String => String = _.toUpperCase val upperUDF = spark.udf.register("upper", upper) df.withColumn("upper", upperUDF(col("text"))).show 21 / 58 DataFrame Actions ▶ Like RDDs, DataFrames also have their own set of actions. ▶ collect: returns an array that contains all of rows in this DataFrame. ▶ count: returns the number of rows in this DataFrame. ▶ first and head: returns the first row of the DataFrame. ▶ show: displays the top 20 rows of the DataFrame in a tabular form. ▶ take: returns the first n rows of the DataFrame. 22 / 58 Aggregation 23 / 58 Aggregation ▶ In an aggregation you specify A key or grouping An aggregation function ▶ The given function must produce one result for each group. 24 / 58 Grouping Types ▶ Summarizing a complete DataFrame ▶ Group by ▶ Windowing 25 / 58 Grouping Types ▶ Summarizing a complete DataFrame ▶ Group by ▶ Windowing 26 / 58 Summarizing a Complete DataFrame Functions ▶ count returns the total number of values. people.select(count("age")).show() 27 / 58 Summarizing a Complete DataFrame Functions ▶ count returns the total number of values. ▶ countDistinct returns the number of unique groups. people.select(count("age")).show() people.select(countDistinct("name")).show() 27 / 58 Summarizing a Complete DataFrame Functions ▶ count returns the total number of values. ▶ countDistinct returns the number of unique groups. ▶ first, last, min, max, sum, avg people.select(count("age")).show() people.select(countDistinct("name")).show() people.select(first("name"), last("age"), min("age"), max("age"), sum("age"), avg("age")).show() 27 / 58 Grouping Types ▶ Summarizing a complete DataFrame ▶ Group by ▶ Windowing 28 / 58 Group By (1/2) ▶ Perform aggregations on groups in the data. ▶ Typically on categorical data. 29 / 58 Group By (1/2) ▶ Perform aggregations on groups in the data. ▶ Typically on categorical data. ▶ We do this grouping in two phases: 1. Specify the column(s) on which we would like to group. 2. Specify the aggregation(s) using the agg operation. 29 / 58 Group By (2/2) ▶ Grouping with expressions people.groupBy("name").agg(count("age").alias("ageagg")).show() 30 / 58 Group By (2/2) ▶ Grouping with expressions ▶ Specifying transformations as a series of Maps people.groupBy("name").agg(count("age").alias("ageagg")).show() people.groupBy("name").agg("age" -> "count", "age" -> "avg", "id" -> "max").show() 30 / 58 What is the output? people.groupBy("name").agg("age" -> "count", "age" -> "avg", "id" -> "max").show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Andy| +---+---+-------+ 31 / 58 What is the output? people.groupBy("name").agg("age" -> "count", "age" -> "avg", "id" -> "max").show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Andy| +---+---+-------+ Option 1 Option 2 +-------+----------+--------+-------+ +-------+----------+--------+-------+ | name|count(age)|avg(age)|max(id)| | name|count(age)|avg(age)|max(id)| +-------+----------+--------+-------+ +-------+----------+--------+-------+ |Michael| 1| 15.0| 12| |Michael| 1| 21.33| 20| | Andy| 2| 24.5| 20| | Andy| 2| 21.33| 20| +-------+----------+--------+-------+ +-------+----------+--------+-------+ 31 / 58 Grouping Types ▶ Summarizing a complete DataFrame ▶ Group by ▶ Windowing 32 / 58 Windowing (1/2) ▶ Computing some aggregation on a specific window of data. ▶ The window determines which rows will be passed in to this function. ▶ You define them by using a reference to the current data. [M. Zaharia et al., Spark: The Definitive Guide, O’Reilly Media, 2018] 33 / 58 Windowing (2/2) ▶ Unlike grouping, here each row can fall into one or more frames. import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.col val people = spark.read.format("json").load("people.json") val windowSpec = Window.rowsBetween(-1, 1) val avgAge = avg(col("age")).over(windowSpec) people.select(col("name"), col("age"), avgAge.alias("avg_age")).show 34 / 58 What is the output? val windowSpec = Window.rowsBetween(-1, 1) val avgAge = avg(col("age")).over(windowSpec) people.select(col("name"), col("age"), avgAge.alias("avg_age")).show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Andy| +---+---+-------+ 35 / 58 What is the output? val windowSpec = Window.rowsBetween(-1, 1) val avgAge = avg(col("age")).over(windowSpec) people.select(col("name"), col("age"), avgAge.alias("avg_age")).show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Andy| +---+---+-------+ Option 1 Option 2 +-------+---+--------+ +-------+---+--------+ | name|age| avg_age| | name|age| avg_age| +-------+---+--------+ +-------+---+--------+ |Michael| 15| 22.5| |Michael| 15| 7.5| | Andy| 30| 21.33| | Andy| 30| 22.5| | Andy| 19| 24.5| | Andy| 19| 21.33| +-------+---+--------+ +-------+---+--------+ 35 / 58 Joins 36 / 58 Joins ▶ Joins are relational constructs you use to combine relations together. ▶ Different join types: inner join, outer join, left outer join, right outer join, left semi join, left anti join, cross join 37 / 58 Joins Example val person = Seq((0, "Seif", 0), (1, "Amir", 1), (2, "Sarunas", 1)).toDF("id", "name", "group_id") val group = Seq((0, "SICS/KTH"), (1, "KTH"), (2, "SICS")).toDF("id", "department") 38 / 58 Joins Example - Inner val joinExpression = person.col("group_id") === group.col("id") var joinType = "inner" person.join(group, joinExpression, joinType).show() +---+-------+--------+---+----------+ | id| name|group_id| id|department| +---+-------+--------+---+----------+ | 0| Seif| 0| 0| SICS/KTH| | 1| Amir| 1| 1| KTH| | 2|Sarunas| 1| 1| KTH| +---+-------+--------+---+----------+ 39 / 58 Joins Example - Outer val joinExpression = person.col("group_id") === group.col("id") var joinType = "outer" person.join(group, joinExpression, joinType).show() +----+-------+--------+---+----------+ | id| name|group_id| id|department| +----+-------+--------+---+----------+ | 1| Amir| 1| 1| KTH| | 2|Sarunas| 1| 1| KTH| |null| null| null| 2| SICS| | 0| Seif| 0| 0| SICS/KTH| +----+-------+--------+---+----------+ 40 / 58 Joins Communication Strategies ▶ Two different communication ways during joins: Shuffle join: big table to big table Broadcast join: big table to small table 41 / 58 Shuffle Join ▶ Every node talks to every other node. ▶ They share data according to which node has a certain key or set of keys. [M. Zaharia et al., Spark: The Definitive Guide, O’Reilly Media, 2018] 42 / 58 Broadcast Join ▶ When the table is small enough to fit into the memory of a single worker node. [M. Zaharia et al., Spark: The Definitive Guide, O’Reilly Media, 2018] 43 / 58 SQL 44 / 58 SQL ▶ You can run SQL queries on views/tables via the method sql on the SparkSession object. spark.sql("SELECT * from people_view").show() +---+---+-------+ |age| id| name| +---+---+-------+ | 15| 12|Michael| | 30| 15| Andy| | 19| 20| Justin| | 12| 15| Andy| | 19| 20| Jim| | 12| 10| Andy| +---+---+-------+ 45 / 58 Temporary View ▶ createOrReplaceTempView creates (or replaces) a lazily evaluated view. ▶ You can use it like a table in Spark SQL. people.createOrReplaceTempView("people_view") val teenagersDF = spark.sql("SELECT name, age FROM people_view WHERE age BETWEEN 13 AND 19") 46 / 58 DataSet 47 / 58 Untyped API with DataFrame ▶ DataFrames elements are Rows, which are generic untyped JVM objects. ▶ Scala compiler cannot type check Spark SQL schemas in DataFrames. 48 / 58 Untyped API with DataFrame ▶ DataFrames elements are Rows, which are generic untyped JVM objects. ▶ Scala compiler cannot type check Spark SQL schemas in DataFrames. ▶ The following code compiles, but you get a runtime exception. id num is not in the DataFrame columns [name, age, id] // people columns: ("name", "age", "id") val people = spark.read.format("json").load("people.json") people.filter("id_num < 20") // runtime exception 48 / 58 Why DataSet? ▶ Assume the following example case class Person(name: String, age: BigInt, id: BigInt) val peopleRDD = sc.parallelize(Array(Person("seif", 65, 0), Person("amir", 40, 1))) val peopleDF = peopleRDD.toDF 49 / 58 Why DataSet? ▶ Assume the following example case class Person(name: String, age: BigInt, id: BigInt) val peopleRDD = sc.parallelize(Array(Person("seif", 65, 0), Person("amir", 40, 1))) val peopleDF = peopleRDD.toDF ▶ Now, let’s use collect to bring back it to the master. val collectedPeople = peopleDF.collect() // collectedPeople: Array[org.apache.spark.sql.Row] 49 / 58 Why DataSet? ▶ Assume the following example case class Person(name: String, age: BigInt, id: BigInt) val peopleRDD = sc.parallelize(Array(Person("seif", 65, 0), Person("amir", 40, 1))) val peopleDF = peopleRDD.toDF ▶ Now, let’s use collect to bring back it to the master. val collectedPeople = peopleDF.collect() // collectedPeople: Array[org.apache.spark.sql.Row] ▶ What is in Row? 49 / 58 Why DataSet? ▶ To be able to work with the collected values, we should cast the Rows. How many columns? What types? // Person(name: Sting, age: BigInt, id: BigInt) val collectedList = collectedPeople.map { row => (row(0).asInstanceOf[String], row(1).asInstanceOf[Int], row(2).asInstanceOf[Int]) } 50 / 58 Why DataSet? ▶ To be able to work with the collected values, we should cast the Rows. How many columns? What types? // Person(name: Sting, age: BigInt, id: BigInt) val collectedList = collectedPeople.map { row => (row(0).asInstanceOf[String], row(1).asInstanceOf[Int], row(2).asInstanceOf[Int]) } ▶ But, what if we cast the types wrong? ▶ Wouldn’t it be nice if we could have both Spark SQL optimizations and typesafety? 50 / 58 DataSet ▶ Datasets can be thought of as typed distributed collections of data. ▶ Dataset API unifies the DataFrame and RDD APls. ▶ You can consider a DataFrame as an alias for Dataset[Row], where a Row is a generic untyped JVM object. type DataFrame = Dataset[Row] [http://why-not-learn-something.blogspot.com/2016/07/apache-spark-rdd-vs-dataframe-vs-dataset.html] 51 / 58 Structured APIs in Spark [J.S. Damji et al., Learning Spark - Lightning-Fast Data Analytics] 52 / 58 Creating DataSets ▶ To convert a sequence or an RDD to a Dataset, we can use toDS(). ▶ You can call as[SomeCaseClass] to convert the DataFrame to a Dataset. case class Person(name: String, age: BigInt, id: BigInt) val personSeq = Seq(Person("Max", 33, 0), Person("Adam", 32, 1)) 53 / 58 Creating DataSets ▶ To convert a sequence or an RDD to a Dataset, we can use toDS(). ▶ You can call as[SomeCaseClass] to convert the DataFrame to a Dataset. case class Person(name: String, age: BigInt, id: BigInt) val personSeq = Seq(Person("Max", 33, 0), Person("Adam", 32, 1)) val ds1 = sc.parallelize(personSeq).toDS 53 / 58 Creating DataSets ▶ To convert a sequence or an RDD to a Dataset, we can use toDS(). ▶ You can call as[SomeCaseClass] to convert the DataFrame to a Dataset. case class Person(name: String, age: BigInt, id: BigInt) val personSeq = Seq(Person("Max", 33, 0), Person("Adam", 32, 1)) val ds1 = sc.parallelize(personSeq).toDS val ds2 = spark.read.format("json").load("people.json").as[Person] 53 / 58 DataSet Transformations ▶ Transformations on Datasets are the same as those that we had on DataFrames. ▶ Datasets allow us to specify more complex and strongly typed transformations. case class Person(name: String, age: BigInt, id: BigInt) val people = spark.read.format("json").load("people.json").as[Person] people.filter(x => x.age < 40).show() people.map(x => (x.name, x.age + 5, x.id)).show() 54 / 58 Summary 55 / 58 Summary ▶ RDD vs. DataFrame vs. DataSet ▶ Transormation and Actions ▶ Aggregation ▶ Join ▶ SQL queries 56 / 58 References ▶ M. Zaharia et al., “Spark: The Definitive Guide”, O’Reilly Media, 2018 - Chapters 4-11. ▶ M. Armbrust et al., “Spark SQL: Relational data processing in spark”, ACM SIG- MOD, 2015. 57 / 58 Questions? 58 / 58 Introduction to Data Stream Processing Amir H. Payberah [email protected] 2024-09-18 Where Are We? 1 / 50 Stream Processing (1/3) ▶ Stream processing is the act of continuously incorporating new data to compute a result. 2 / 50 Stream Processing (2/3) ▶ The input data is unbounded. A series of events, no predetermined beginning or end. 3 / 50 Stream Processing (2/3) ▶ The input data is unbounded. A series of events, no predetermined beginning or end. E.g., credit card transactions, clicks on a website, or sensor readings from IoT devices. 3 / 50 Stream Processing (3/3) ▶ Database Management Systems (DBMS): data-at-rest analytics Store and index data before processing it. Process data only when explicitly asked by the users. 4 / 50 Stream Processing (3/3) ▶ Database Management Systems (DBMS): data-at-rest analytics Store and index data before processing it. Process data only when explicitly asked by the users. ▶ Stream Processing Systems (SPS): data-in-motion analytics Processing information as it flows, without storing them persistently. 4 / 50 Streaming Data ▶ Data stream is unbound data, which is broken into a sequence of individual tuples. ▶ A data tuple is the atomic data item in a data stream. ▶ Can be structured, semi-structured, and unstructured. 5 / 50 Streaming Processing Patterns ▶ Micro-batch systems Batch engines Slicing up the unbounded data into a sets of bounded data, then process each batch.