Chapter 6 Working with Different Types of Data (PDF)
Document Details
Uploaded by EnrapturedElf
Tags
Related
- Lecture #2.2 - Spark Structured Streaming API.pdf
- Lecture #6.1 - Data Processing - Apache Spark Graph API.pdf
- (Spark) Chapter 15 How Spark Runs on a Cluster.pdf
- (Spark) Chapter 15 How Spark Runs on a Cluster.pdf
- Spark Chapter 15: How Spark Runs on a Cluster PDF
- SPARK: A Progressive Web Application for Housekeeping Management and Supervision in Educational Institutions PDF
Summary
This chapter from a Spark programming guide gives a detailed look at working with various data types, including Booleans, numbers, strings, and complex types within Spark DataFrames. It covers methods for transforming data, handling null values, and creating boolean expressions. Examples are shown in Scala and Python.
Full Transcript
Chapter 6. Working with Different Types of Data Chapter 5 presented basic DataFrame concepts and abstractions. This chapter covers building expressions, which are the bread and butter of Spark’s structured operations. We also review working with a variety of different kinds of data, including the f...
Chapter 6. Working with Different Types of Data Chapter 5 presented basic DataFrame concepts and abstractions. This chapter covers building expressions, which are the bread and butter of Spark’s structured operations. We also review working with a variety of different kinds of data, including the following: Booleans Numbers Strings Dates and timestamps Handling null Complex types User-defined functions Where to Look for APIs Before we begin, it’s worth explaining where you as a user should look for transformations. Spark is a growing project, and any book (including this one) is a snapshot in time. One of our priorities in this book is to teach where, as of this writing, you should look to find functions to transform your data. Following are the key places to look: DataFrame (Dataset) Methods This is actually a bit of a trick because a DataFrame is just a Dataset of Row types, so you’ll actually end up looking at the Dataset methods, which are available at this link. Dataset submodules like DataFrameStatFunctions and DataFrameNaFunctions have more methods that solve specific sets of problems. DataFrameStatFunctions, for example, holds a variety of statistically related functions, whereas DataFrameNaFunctions refers to functions that are relevant when working with null data. Column Methods These were introduced for the most part in Chapter 5. They hold a variety of general column- related methods like alias or contains. You can find the API Reference for Column methods here. org.apache.spark.sql.functions contains a variety of functions for a range of different data types. Often, you’ll see the entire package imported because they are used so frequently. You can find SQL and DataFrame functions here. Now this may feel a bit overwhelming but have no fear, the majority of these functions are ones that you will find in SQL and analytics systems. All of these tools exist to achieve one purpose, to transform rows of data in one format or structure to another. This might create more rows or reduce the number of rows available. To begin, let’s read in the DataFrame that we’ll be using for this analysis: // in Scaia vai df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/data/retail-data/by-day/2010-12-01.csv") df.printSchemaQ df.createOrReplaceTempView("dfTable") # in Python df = spark.read.format("csv")\.option("header", "true")\.option("inferSchema", "true")\.load("/data/retail-data/by-day/2010-12-01.csv") df.printSchema() df.createOrReplaceTempView("dfTable") Here’s the result of the schema and a small sample of the data: root |-- InvoiceNo: string (nullable = true) |-- StockCode: string (nullable = true) |-- Description: string (nullable = true) |-- Quantity: integer (nullable = true) |-- InvoiceDate: timestamp (nullable = true) |-- UnitPrice: double (nullable = true) |-- CustomerlD: double (nullable = true) |-- Country: string (nullable = true) +.................. +................... + ------------------- +-------------- + ------------------- + - — |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|Unit +.................. +................... +....................... +................. + ------------------- + - — | 5363651 85123A|WHITE HANGING HEA... | 6|2010-12-01 08:26:00| | 5363651 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:001 | 5363671 21755|LOVE BUILDING BLO...| 3|2010-12-01 08:34:00| | 536367| 21777|RECIPE BOX WITH M...| 4|2010-12-01 08:34:00| +.................. + -................ +............................................ + - —......... +.......................................... + - - Converting to Spark Types One thing you’ll see us do throughout this chapter is convert native types to Spark types. We do this by using the first function that we introduce here, the lit function. This function converts a type in another language to its correspnding Spark representation. Here’s how we can convert a couple of different kinds of Scala and Python values to their respective Spark types: // in Scala import org.apache.spark.sql.functions.lit df.select(lit(5), lit("five"), lit(5.0)) # in Python from pyspark.sql.functions import lit df.select(lit(5), lit("five"), lit(5.0)) There’s no equivalent function necessary in SQL, so we can use the values directly: -- in SQL SELECT 5, "five", 5.0 Working with Booleans Booleans are essential when it comes to data analysis because they are the foundation for all filtering. Boolean statements consist of four elements: and, or, true, and false. We use these simple structures to build logical statements that evaluate to either true or false. These statements are often used as conditional requirements for when a row of data must either pass the test (evaluate to true) or else it will be filtered out. Let’s use our retail dataset to explore working with Booleans. We can specify equality as well as less-than or greater-than: // in Scala import org.apache.spark.sql.functions.col df.where(col("InvoiceNo").equalTo(536365)).select("InvoiceNo", "Description").show(5, false) WARNING Scala has some particular semantics regarding the use of == and ===. In Spark, if you want to filter by equality you should use === (equal) or =! = (not equal). You can also use the not function and the equalTo method. //in Scala import org.apache.spark.sql.functions.col df.where(col("InvoiceNo") === 536365).select("InvoiceNo", "Description").show(5, false) Python keeps a more conventional notation: # in Python from pyspark.sql.functions import col df.where(col("InvoiceNo") != 536365)\.select("InvoiceNo", "Description")\.show(5, False) + +............................................................... + |InvoiceNo|Description | 1536366 |HAND WARMER UNION JACK 1536367 |POPPY'S PLAYHOUSE KITCHEN Another option—and probably the cleanest—is to specify the predicate as an expression in a string. This is valid for Python or Scala. Note that this also gives you access to another way of expressing “does not equal”: df.where("InvoiceNo = 536365").show(5, false) df.where("InvoiceNo 536365").show(5, false) We mentioned that you can specify Boolean expressions with multiple parts when you use and or or. In Spark, you should always chain together and filters as a sequential filter. The reason for this is that even if Boolean statements are expressed serially (one after the other), Spark will flatten all of these filters into one statement and perform the filter at the same time, creating the and statement for us. Although you can specify your statements explicitly by using and if you like, they’re often easier to understand and to read if you specify them serially, or statements need to be specified in the same statement: // in Scala val priceFilter = col("UnitPrice") > 600 val descripFilter = col("Description").contains("POSTAGE") df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter)).show() # in Python from pyspark.sql.functions import instr priceFilter = col("UnitPrice") > 600 descripFilter = instr(df.Description, "POSTAGE") >= 1 df.where(df.StockCode.isin("D0T")).where(priceFilter | descripFilter).show() -- in SQL SELECT * FROM dfTable WHERE StockCode in ("DOT") AND(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1) + + + + -------- + + +.. |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|.. +................... +................... +...............................+................. +.......................................... +................... +.. | 536544| DOT|DOTCOM POSTAGE| 1|2010-12-01 14:32:00| 569.77|.. | 536592| DOT|DOTCOM POSTAGE| 1|2010-12-01 17:06:00| 607.49|.. +...................+................... + +................ + + Boolean expressions are not just reserved to filters. To filter a DataFrame, you can also just specify a Boolean column: // in Scala val DOTCodeFilter = col("StockCode") === "DOT" val priceFilter = col("UnitPrice") > 600 val descripFilter = col("Description").contains("POSTAGE") df.withColunin("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter))).where("isExpensive").select("unitPrice", "isExpensive").show(5) # in Python from pyspark.sql.functions import instr DOTCodeFilter = col("StockCode") == "DOT" priceFilter = col("UnitPrice") > 600 descripFilter = instr(col("Description"), "POSTAGE") >= 1 df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\.where("isExpensive")\.select("unitPrice", "isExpensive").show(5) -- in SQL SELECT UnitPrice, (StockCode = 'DOT' AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive FROM dfTable WHERE (StockCode = 'DOT' AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) Notice how we did not need to specify our filter as an expression and how we could use a column name without any extra work. If you’re coming from a SQL background, all of these statements should seem quite familiar. Indeed, all of them can be expressed as a where clause. In fact, it’s often easier to just express filters as SQL statements than using the programmatic DataFrame interface and Spark SQL allows us to do this without paying any performance penalty. For example, the following two statements are equivalent: //in Scala import org.apache.spark.sql.functions.{expr, not, col} df.withColumn("isExpensive", not(col("UnitPrice").leq(250))).filter("isExpensive").select("Description", "UnitPrice").show(5) df.withColumn("isExpensive", expr("N0T UnitPrice 5, "Description" -> "No Value") df.na.fill(fillColValues) # in Python fill_cols_vals = {"StockCode": 5, "Description" : "No Value"} df.na.fill(fill_cols_vals) replace In addition to replacing null values like we did with drop and fill, there are more flexible options that you can use with more than just null values. Probably the most common use case is to replace all values in a certain column according to their current value. The only requirement is that this value be the same type as the original value: // in Scala df.na.replace("Description", Map("" -> "UNKNOWN")) # in Python df.na.replace([""], ["UNKNOWN"], "Description") Ordering As we discussed in Chapter 5, you can use asc_nuHs_first, desc_nuH.s_fi.rst, asc_nulls_last, or desc_nuHs_last to specify where you would like your null values to appear in an ordered DataFrame. Working with Complex Types Complex types can help you organize and structure your data in ways that make more sense for the problem that you are hoping to solve. There are three kinds of complex types: structs, arrays, and maps. Structs You can think of structs as DataFrames within DataFrames. A worked example will illustrate this more clearly. We can create a struct by wrapping a set of columns in parenthesis in a query: df.selectExpr("(Description, InvoiceNo) as complex", "*") df.selectExpr("struct(Description, InvoiceNo) as complex", "*") // in Scala import org.apache.spark.sql.functions.struct val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex")) complexDF.createOrReplaceTempView("complexDF") # in Python from pyspark.sql.functions import struct complexDF = df.select(struct("Description", "InvoiceNo").alias("complex")) complexDF.createOrReplaceTempView("complexDF") We now have a DataFrame with a column complex. We can query it just as we might another DataFrame, the only difference is that we use a dot syntax to do so, or the column method getField: complexDF.select("complex.Description") complexDF.select(col("complex").getField("Description")) We can also query all values in the struct by using *. This brings up all the columns to the top- level DataFrame: complexDF.select("complex.*") -- in SQL SELECT complex.* FROM complexDF Arrays To define arrays, let’s work through a use case. With our current data, our objective is to take every single word in our Description column and convert that into a row in our DataFrame. The first task is to turn our Description column into a complex type, an array. split We do this by using the split function and specify the delimiter: // in Scala import org.apache.spark.sql.functions.split df.select(split(col("Description"), " ")).show(2) # in Python from pyspark.sql.functions import split df.select(split(col("Description"), " ")).show(2) -- in SQL SELECT split(Description, ' ') FROM dfTable +........................................... + |split(Description, )| +------------------------------------ + | [WHITE, HANGING,...| | [WHITE, METAL, LA...| +...........................................+ This is quite powerful because Spark allows us to manipulate this complex type as another column. We can also query the values of the array using Python-like syntax: // in Scala df.select(split(col("Description"), " ").alias("array_col")).selectExpr("array_col ").show(2) # in Python df.select(split(col("Description"), " ").alias("array_col"))\.selectExpr("array_col").show(2) -- in SQL SELECT split(Description, ' ') FROM dfTable This gives us the following result: +........................ + Iarray_col| +........................ + | WHITE| | WHITE| +........................ + Array Length We can determine the array’s length by querying for its size: // in Scala import org.apache.spark.sql.functions.size df.select(size(split(col("Description"), " "))).show(2) // shows 5 and 3 # in Python from pyspark.sql.functions import size df.select(size(split(col("Description"), " "))).show(2) # shows 5 and 3 array_contains We can also see whether this array contains a value: // in Scala import org.apache.spark.sql.functions.array_contains df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2) # in Python from pyspark.sql.functions import array_contains df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2) -- in SQL SELECT array_contains(split(Description, ' '), 'WHITE') FROM dfTable This gives us the following result: + + Iarray_contains(split(Description, ), WHITE)| + + true| true| + + However, this does not solve our current problem. To convert a complex type into a set of rows (one per value in our array), we need to use the explode function. explode The explode function takes a column that consists of arrays and creates one row (with the rest of the values duplicated) per value in the array. Figure 6-1 illustrates the process. "Hello World" , "other col" —» [."Hello" , "World"!, "other col" —-> "Hello" , "other col" "World" , "other col" Figure 6-1. Exploding a column of text // in Scala import org.apache.spark.sql.functions.{split, explode} df.withColumn("splitted", split(col("Description"), " ")).withColumn("exploded", explode(col("splitted"))).select("Description", "InvoiceNo", "exploded").show(2) # in Python from pyspark.sql.functions import split, explode df.withColumn("splitted", split(col("Description"), " "))\.withColumn("exploded", explode(col("splitted")))\.select("Description", "InvoiceNo", "exploded").show(2) -- in SQL SELECT Description, InvoiceNo, exploded FROM (SELECT *, split(Description, " ") as splitted FROM dfTable) LATERAL VIEW explode(splitted) as exploded This gives us the following result: +.........................................+ + + | Description|InvoiceNo|exploded| +.........................................+ - + + |WHITE HANGING HEA...| 536365| WHITE| IWHITE HANGING HEA...| 536365| HANGING| + -------------------- +-------------- + -------- + Maps Maps are created by using the map function and key-value pairs of columns. You then can select them just like you might select from an array: //in Scala import org.apache.spark.sql.functions.map df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(2) # in Python from pyspark.sql.functions import create_map df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\.show(2) -- in SQL SELECT map(Description, InvoiceNo) as complex_map FROM dfTable WHERE Description IS NOT NULL This produces the following result: + + | complex_map| + + |Map(WHITE HANGING...| |Map(WHITE METAL L...| + + You can query them by using the proper key. A missing key returns null: // in Scala df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).selectExpr("complex_map['WHITE METAL LANTERN']").show(2) # in Python df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\.selectExpr("complex_map['WHITE METAL LANTERN']").show(2) This gives us the following result: + -------------------------------- + |complex_map[WHITE METAL LANTERN]| +------------------------------------------------------- + | null| | 536365| + -------------------------------- + You can also explode map types, which will turn them into columns: // in Scala df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).selectExpr("explode(complex_map)").show(2) # in Python df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\.selectExpr("explode(complex_map)").show(2) This gives us the following result: + - +- + | key| value| +........................................ + + |WHITE HANGING HEA...|536365| | WHITE METAL LANTERN|536365| +---------------------------------- +---------- + Working with JSON Spark has some unique support for working with JSON data. You can operate directly on strings of JSON in Spark and parse from JSON or extract JSON objects. Let’s begin by creating a JSON column: //in Scala val jsonDF = spark.range(l).selectExpr(""" '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""") # in Python jsonDF = spark.range(l).selectExpr(""" '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""") You can use the get_json_object to inline query a JSON object, be it a dictionary or array. You can use json_tuple if this object has only one level of nesting: // in Scala import org.apache.spark.sql.functions.{get_json_object, json_tuple} jsonDF.select( get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[l]") as "column", json_tuple(col("jsonString"), "myJSONKey")).show(2) # in Python from pyspark.sql.functions import get_json_object, json_tuple jsonDF.select( get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[l]") as "column", json_tuple(col("jsonString"), "myJSONKey")).show(2) Here’s the equivalent in SQL: jsonDF.selectExpr( "json_tuple(jsonString, '$.myJSONKey.myJSONValue[l]') as column").show(2) This results in the following table: +--------- + |column| c0| + +.........................................+ | 2|{"myJSONValue":[1...| +........... + You can also turn a StructType into a JSON string by using the to_json function: // in Scala import org.apache.spark.sql.functions.to_json df.selectExpr("(InvoiceNo, Description) as myStruct").select(to_json(col("myStruct"))) # in Python from pyspark.sql.functions import to_json df.selectExpr("(InvoiceNo, Description) as myStruct")\.select(to_json(col("myStruct"))) This function also accepts a dictionary (map) of parameters that are the same as the JSON data source. You can use the f rom_json function to parse this (or other JSON data) back in. This naturally requires you to specify a schema, and optionally you can specify a map of options, as well: // in Scala import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types val parseSchema = new StructType(Array( new StructField("InvoiceNo",StringType,true), new StructField("Description",StringType,true))) df.selectExpr("(InvoiceNo, Description) as myStruct").select(to_json(col("myStruct")).alias("newJSON")).select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2) # in Python from pyspark.sql.functions import from_json from pyspark.sql.types import * parseSchema = StructType(( StructField("InvoiceNo",StringType(),True), StructField("Description",StringType(),True))) df.selectExpr("(InvoiceNo, Description) as myStruct")\.select(to_json(col("myStruct")).alias("newJSON"))\.select(from_json(col("newJSON" ), parseSchema), col("newJSON")).show(2) This gives us the following result: +------------------------------------- +----------------------------------- + |jsontostructs(newJSON)| newJSON| +------------------------------------- + -------------------- + | [536365,WHITE HAN...|{"InvoiceNo":"536...| | [536365,WHITE MET...|{"InvoiceNo":"536...| +............................................. +......................................... + User-Defined Functions One of the most powerful things that you can do in Spark is define your own functions. These user-defined functions (UDFs) make it possible for you to write your own custom transformations using Python or Scala and even use external libraries. UDFs can take and return one or more columns as input. Spark UDFs are incredibly powerful because you can write them in several different programming languages; you do not need to create them in an esoteric format or domain-specific language. They’re just functions that operate on the data, record by record. By default, these functions are registered as temporary functions to be used in that specific SparkSession or Context. Although you can write UDFs in Scala, Python, or Java, there are performance considerations that you should be aware of. To illustrate this, we’re going to walk through exactly what happens when you create UDF, pass that into Spark, and then execute code using that UDF. The first step is the actual function. We’ll create a simple one for this example. Let’s write a power3 function that takes a number and raises it to a power of three: // in Scala val udfExampleDF = spark.range(5).toDF("num") def power3(number:Double):Double = number * number * number power3(2.0) # in Python udfExampleDF = spark.range(5).toDF("num") def power3(double_value): return double_value ** 3 power3(2.0) In this trivial example, we can see that our functions work as expected. We are able to provide an individual input and produce the expected result (with this simple test case). Thus far, our expectations for the input are high: it must be a specific type and cannot be a null value (see “Working with Nulls in Data”). Now that we’ve created these functions and tested them, we need to register them with Spark so that we can use them on all of our worker machines. Spark will serialize the function on the driver and transfer it over the network to all executor processes. This happens regardless of language. When you use the function, there are essentially two different things that occur. If the function is written in Scala or Java, you can use it within the Java Virtual Machine (JVM). This means that there will be little performance penalty aside from the fact that you can’t take advantage of code generation capabilities that Spark has for built-in functions. There can be performance issues if you create or use a lot of objects; we cover that in the section on optimization in Chapter 19. If the function is written in Python, something quite different happens. Spark starts a Python process on the worker, serializes all of the data to a format that Python can understand (remember, it was in the JVM earlier), executes the function row by row on that data in the Python process, and then finally returns the results of the row operations to the JVM and Spark. Figure 6-2 provides an overview of the process. □ Exeou+or Proaeeeee | Worker Py+hon Process 0=1.S' qA