Spark Structured API PDF
Document Details
Uploaded by Deleted User
Tags
Summary
This document provides an overview of Spark's structured APIs, comparing and contrasting RDDs, DataFrames, and SQL for big data processing. The document explores the different advantages and use cases of each approach.
Full Transcript
Spark Structured API Problem with Coding Spark Jobs with RDDs Introduction Remember, RDDs are low level objects that are created in order to track lineage RDDs are like low level code in Scala and other languages – Python RDDs API is also available There is both a un...
Spark Structured API Problem with Coding Spark Jobs with RDDs Introduction Remember, RDDs are low level objects that are created in order to track lineage RDDs are like low level code in Scala and other languages – Python RDDs API is also available There is both a understanding problem and an efficiency problem with using RDDs to code Spark Jobs Problem written in Scala RDD What are we trying to achieve with this code? val dataRDD = sc.parallelize(List(("Finance", 60000),("IT",80000),("IT", 90000))) dataRDD.map(r=>(r._1,(r._2,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).map(r=>(r._1,(r._2._1/r._2._2))).collect() Problem with RDDs The previous code is very hard to read But apart from that, it will also be executed as is – This can lead to performance problems in executing the code Another problem written in Scala RDDs Take a look at this piece of code val dataRDD = sc.parallelize(List(("Finance", 60000),("IT",80000),("IT", 90000))) dataRDD.map(r=>(r._1,(r._2,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).filter(r=>r._1="IT").map(r=>(r._1,(r._2._1/r._2._2))).collect() Spark SQL Module What is a query language? A query language uses high level instructions to perform typical data analytics A query instruction is translated into multiple lines of procedural code Example: SQL or HiveQL code SELECT dept, avg(salary) FROM department WHERE dept = 'IT' GROUP BY dept Few benefits – Better readability – Type checking – Faster execution What’s the catch with query languages? A query language is very coarse grained when it comes to manipulating data If you want more granular operations, should use procedural programming Spark SQL A module in Spark that aims to add structure to the data It facilitates creation of three different high level API’s – DataFrames – DataSets – SQL All the benefits you get from a database systems are then obtained if one of the above is used Spark SQL - DataFrames DataFrames are tables with columns They let us load our dataset and operate the dataset as a table Popular in python and R Spark borrowed the concept including the name but in distributed sense DataFrames code Same code with DataFrames val dataDF = dataRDD.toDF("dept", "salary") val result = dataDF.groupBy($"dept").agg(avg($"salary")).filter ($"dept" === "IT") result.collect Interview Question What is the difference between DataFrames in Python and Spark? – DataFrame in Python is local to the system where it is executed – DataFrame in Spark is distributed in the cluster Behind the scenes This is the Spark SQL module DataFrames Datasets Catalyst RDDs Optimizer SQL Catalyst Optimizer looks at the code and comes up with an optimized version Spark SQL API to work with RDDs is called low level API API to work with DataFrames, Datasets and SQL is called the structured API At the end, everything is turned into RDDs Spark encourages the used of structured API Spark SQL There are three structured API’s – DataFrames – DataSets – SQL Within each API, different languages are possible Example DataFrames can be written in Python, Scala, R, etc Example of Speed Comparison Summary Writing Spark jobs in RDDs is hard to follow and has performance hit Structure API easier to understand and has better performance There are three structured API’s supported by Spark Spark encourages the use of structured API It is supported with all aspects in Spark – Spark Streaming, Deep Learning, Machine Learning, etc DataFrames vs DataSets vs SQL DataFrames Code in DataFrames import org.apache.spark.sql.functions._ val dataDF = dataRDD.toDF("dept", "salary") val result = dataDF.groupBy($"dept").agg(avg($"salary")).filter( $"dept" === "IT") result.collect DataFrame vs DataSet Same code in DataSets import org.apache.spark.sql.expressions.scalalang.typed val resultDS = dataDS.filter(_.dept == "IT").groupByKey(_.dept).agg(typed.avg(_.salary)).alias("avg_salary") resultDS.collect SQL implementation Same code in SQL dataDF.createOrReplaceTempView("department") val resultSql = spark.sql("SELECT dept, avg(salary) FROM department WHERE dept = 'IT' group by dept") resultSql.collect Summary of Structured Data Types Introduction There are three flavours of structured API you can use in Spark à DataFrame, DataSet, and SQL – All three go through the Catalyst optimizer We will look at all of these and find the difference By far the most commonly used API is DataFrames Spark RDD Let’s start with executing the code with RDD Here I am defining my RDD val dataRDD = sc.parallelize(List(("Finance", 60000),("IT",80000),("IT", 90000))) Spark RDD Next we are doing the calculation to find the average salary by department dataRDD.map(r=>(r._1,(r._2,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).filter(r=>r._1="IT").map(r=>(r._1,(r._2._1/r._2._2))).collect() Use :paste and press cntrl-D for Spark to interpret the records DataFrames Same code in DataFrames import org.apache.spark.sql.functions._ val dataDF = dataRDD.toDF("dept", "salary") val result = dataDF.groupBy($"dept").agg(avg($"salary")).filter( $"dept" === "IT") result.collect Execute this code, you should get the same output as the RDD SQL implementation Same code in SQL dataDF.createOrReplaceTempView("department") val resultSql = spark.sql("SELECT dept, avg(salary) FROM department WHERE dept = 'IT' group by dept") resultSql.collect Dump our DataFrame into a table and query the table DataFrame vs DataSet DataSets let’s you work with your data on domain specific types – Or types we have created Let’s walk through an example DataFrame vs DataSet Each row in our data has department and salary This can be represented by a department class with two types case class Department (dept: String, salary: Long) DataFrame vs DataSet Case classes are very simple class types in Scala Case class automatically creates a constructor, and getters and setters for dept and salary parameters case class Department (dept: String, salary: Long) DataFrame vs DataSet To create a DataSet, we will use the as method on the DataFrame – We do not need a DataFrame to create a dataset but that’s what we are doing for this example val dataDS = dataDF.as[Department] You will get an output with type “DataSet” DataFrame vs DataSet First, we apply the filter and use the dept property and then compare it to department “IT” import org.apache.spark.sql.expressions.scalalang.typed val resultDS = dataDS.filter(_.dept == "IT").groupByKey(_.dept).agg(typed.avg(_.salary)).alias("avg_salary") resultDS.collect DataFrame vs DataSet Then we apply the group and aggregation function in similar fashion – When we use the aggregation function, we call the average function inside the typed package import org.apache.spark.sql.expressions.scalalang.typed val resultDS = dataDS.filter(_.dept == "IT").groupByKey(_.dept).agg(typed.avg(_.salary)).alias("avg_salary") resultDS.collect DataFrame vs DataSet The main difference between a DataFrame and a DataSet is that in DataFrames you are referring to individual columns In DataSets you are referring to individual properties inside a class type Execute should get the same result DataFrame vs DataSet Why do we have to deal with specific types? Why not use the DataFrame since it is more convenient? The answer is type safety – Type safety controls what is allowed and what is not allowed on our types – For example, we know Department has two properties, dept and salary – If we try to access anything outside of the two we’ll get a compile time error DataFrame vs DataSet We have changed the following code from dept to dept1, what happens when you execute it? val resultDS = dataDS.filter(_.dept1 == "IT").groupByKey(_.dept).agg(typed.avg(_.salary)).ali as("avg_salary") DataFrame vs DataSet Let’s try the same with a DataFrame val result = dataDF.groupBy($"dept1").agg(avg($"salary")).filter($ "dept" === "IT") DataFrame vs DataSet Let’s try the same with SQL val resultSql = spark.sql("SELECT dept, avg(salary) FROM department WHERE dept1 = 'IT' group by dept") DataFrame vs DataSet Now let’s look at how DataFrame, DataSets and SQL handle syntax errors We have changed the filter function to filter1 val resultDS = dataDS.filter1(_.dept1 == "IT").groupByKey(_.dept).agg(typed.avg(_.salary)). alias("avg_salary") DataFrame vs DataSet Do the same for the DataFrame val result = dataDF.groupBy($"dept1").agg(avg($"salary")).filter1 ($"dept" === "IT") DataFrame vs DataSet For SQL, we changed the FROM keyword to FROM1 val resultSql = spark.sql("SELECT dept, avg(salary) FROM1 department WHERE dept = 'IT' group by dept") Summary of Structured Data Types Type Safety If you want type safety and to catch errors sooner at compile time, DataSet is your safe bet Another benefit of type safety is that at compile time, our code is checked to see whether the operation we are trying to perform against types are permissible in the first place Here is an example Type Safety dept is a string column, then if you treat dept as an integer in a DataSet, we get a compile time error but with DataFrames we get a result dataDF.filter($"dept" > 10).collect dataDS.filter(_.dept > 10).collect Type Safety DataFrames are flexible – So good idea if you do not have control over your data If you have strict control on the data, and you are sure that the data is supposed to confirm to a specific type, DataSet is the way to go You use DataSet when you need compile time safety Most production code is in DataFrames Further Reading https://databricks.com/blog/2016/07/14/a-tale-of-three- apache-spark-apis-rdds-dataframes-and-datasets.html StructType and StructField https://sparkbyexamples.com/spark/spark-sql- structtype-on-dataframe/