(Delta) Ch 3 Basic Commands.pdf
Document Details
Uploaded by EnrapturedElf
Tags
Related
- Lab #3.1 - Apache Spark Stream Processing - Truck Fleet Lab.pdf
- Lab #5.1 - Apache Spark Stream Processing - Truck Fleet Lab II PDF
- Lecture #6.1 - Data Processing - Apache Spark Graph API.pdf
- Lecture #9.1 - Data Processing - Apache Spark ML API.pdf
- Delta-Lake-cheat-sheet.pdf
- ELT with Apache Spark PDF
Full Transcript
CHAPTER 3 Basic Operations on Delta Tables The file provides information on how to create and manage Delta tables using the DataFrameWriter API in Apache Spark. It also demonstrates how to...
CHAPTER 3 Basic Operations on Delta Tables The file provides information on how to create and manage Delta tables using the DataFrameWriter API in Apache Spark. It also demonstrates how to insert data into the Delta table from a Spark DataFrame. The file discusses managed tables, saving DataFrames as managed Hive tables, partitioning data, and creating unmanaged tables. Key Points: 1. Delta tables created without a location are referred to as managed tables. 2. DataFrameWriter API can be used to simultaneously create and insert data into Delta tables. 3. Delta tables can be partitioned using the PARTITIONED BY clause. Delta tables can be created in a variety of ways. How you create your tables largely depends on your familiarity with the toolset. If you are primarily a SQL developer, you can use SQL’s CREATE TABLE to create a Delta table, while Python users may pre fer the DataFrameWriter API or the fine-grained and easy to use DeltaTableBuilder API. When creating tables you can define GENERATED columns, the values of which are automatically generated based on a user-specified function over other columns in the Delta table. While some restrictions apply, generated columns are a powerful way to enrich your Delta table schemas. Delta tables can be read by standard ANSI SQL or using the popular PySpark DataFrameReader API. You can write to a Delta table by using the classic SQL INSERT statement, or you can append a DataFrame to the table. Finally, leveraging the SQL COPY INTO option is a great way to append large amounts of data quickly. Partitioning a Delta table based upon your frequently used query pattern can dramat ically improve your query and DML performance. The individual files that make up your Delta table will be organized in subdirectories that align to the values of your partitioning columns. Delta Lake allows you to associate custom metadata with the commit entries in your transaction log. This can be leveraged to tag sensitive commits for auditing purposes. You can also store custom tags in your table properties, so just like you can have tags for your cloud resources, you can now associate those tags with your Delta tables. You can also modify certain Delta capabilities. For example, you can associate the delta. appendonly property to a table to prevent deletes and updates. 49 Creating a Delta Table Delta Lake enables us to create tables in three different ways: SQL Data Definition Language (DDL) commands SQL developers are already very familiar with the classic CREATE TABLE com mand, and you will be able to use it to create a Delta table by adding just a few attributes. PySpark DataFrameWriter API Big data Python (and Scala) developers will very likely already be very familiar with this API, and you will be able to continue to use it with Delta tables. DeltaTableBuilder API This is a new API specifically designed for Delta tables. It uses the popular Builder pattern, and it gives very fine-grained control over every Delta table and column attribute. In the following sections we get hands-on with each of these table creation methods. Creating a Delta Table with SQL DDL The version of SQL used in Spark compute environments is called Spark SQL, which is a variant of ANSI SQL supported by Spark. Spark SQL is generally compatible with ANSI standard SQL. Refer to the Spark documentation for additional details on the Spark SQL variant. As mentioned earlier, you can use the standard SQL DDL commands in Spark SQL to create a Delta table:1 %sql -- Create a Delta table by specifying the delta format, followed -- by the path in quotes CREATE TABLE IF NOT EXISTS delta/mnt/datalake/book/chapter03/rateCard ' ( rateCodeld INT, rateCodeDesc STRING ) USING DELTA The notation that you are using for the table name is the file_format | 'path_to_tabbe' notation, where the file_format is delta, and path_to_table is the path to the Delta table. 1 GitHub repo location: /Chapter03/02 - CreateDeltaTableWithSql 50 | Chapter 3: Basic Operations on Delta Tables Using this format can get tedious, since filepaths can get rather long in the real world. This is where catalogs come in. A catalog allows you to register a table with a database.table_name notation, where database is a logical grouping of tables, and table_name is a shorthand for the table. For example, if you first created a database named taxtdb as follows: %sql CREATE DATABASE IF NOT EXISTS taxtdb; Then you could create the above table as follows: %sql -- Create the table using the taxtdb catalog CREATE TABLE IF NOT EXISTS taxtdb.rateCard ( rateCodeld INT, rateCodeDesc STRING ) USING DELTA LOCATION '/mnt/datalake/book/chapter03/rateCard' From this point forward, you can refer to this Delta table as taxtdb. rateCard, which is easier to remember and type than delta./mnt/datalake/book/chapter03/rate Card, or possibly an even longer pathname. The most widely used catalog in the Spark ecosystem is the Hive catalog. When running a directory listing on the data lake location where the table was created, you can see that our directory is empty (since you have not loaded any data), except for the _delta_log directory, which contains the table’s transaction log: %sh Is -al /dbfs/pint/datalake/book/chapter03/rateCard total 12 drwxrwxrwx 2 root root 4096 Dec 2 19:02. drwxrwxrwx 2 root root 4096 Dec 2 19:02.. drwxrwxrwx 2 root root 4096 Dec 2 16:40 -delta-log s Please note that since you are running this as a shell command in the Databricks Community Edition environment, you have to prefix our path for the Is command with /dbf s. Creating a Delta Table | 51 When you open the _delta_log directory, you see our first transaction log entry: %sh Is -al /dbfs/mnt/datalake/book/chapter03/rateCard/_delta_log total 15 drwxrwxrwx 2 root root 4096 Dec 2 19:02 drwxrwxrwx 2 root root 4096 Dec 2 19:02 -rwxrwxrwx 1 root root 1886 Dec 2 19:02 00000000000000000000.crc -rwxrwxrwx 1 root root 939 Dec 2 19:02 00000000000000000000.json In the transaction log discussion in Chapter 2, you read about the different actions that can be written to your transaction log entry. One of those actions is the meta data action, which describes the schema of the table, the partitioning columns (if applicable), and other information. This metadata action is always written to the first transaction log entry created for our new table. To find this metadata action, you can do a search for the string metadata in the transaction entry: %sh grep metadata /dbfs/mnt/datalake/book/chapter03/rateCard /_delta_log/O0000.json > /tmp/metadata.json python -m json.tool /tmp/metadata.json This produces the following output: { "metaData": { "id": "f79c4cll-a807-49bc-93f4-2bbe778e2a04", "format": { "provider": "parquet", "options": {} }, "schemastring": "{\"type\":\"struct\", \"fields\":[{\"name\":\"rateCodeId\", \"type\":\"integer\",\"nullable\":true, \"metadata\":[}},{\"name\":\"rateCodeDesc\", \"type\":\"string\",\"nullable\":true, \"metadata\":{}}]}", "partitioncolumns": [], "configuration": {}, "createdTime": 1670007736533 } } Here, you see that Delta Lake has written the schema of the table to the transaction log entry, together with some auditing and partitioning information. 52 | Chapter 3: Basic Operations on Delta Tables s In the preceding command, we first perform a grep command, which searches for the string metadata in the transaction log entry. We then write the output of that to a temp file. The next line uses python -m json.tool with the temp file as input. The json.tool Python module will “pretty print” the content of a JSON file, which can be very handy for readability. The DESCRIBE Statement The SQL DESCRIBE command can be used to return the basic metadata for a Parquet file or Delta table. The metadata returned for a table includes one line for each column with the following information: The column name The column data type Any comments that were applied to the column Following is an example of the DESCRIBE command at the table level: %sql DESCRIBE TABLE taxidb.rateCard; -+ +............. col_name 1 data_type | comment rateCodeld 1 int | rateCodeDesc 1 string | -+ +................... When you want to find the Delta Lake-specific attributes, you can also use the DESCRIBE TABLE EXTENDED command, which provides more detailed metadata infor mation, including the following generic attributes: The catalog name for the database in which the table was created (in this case the Hive metastore) The Hive database The table name The location of the underlying files The owner of the table The table properties Creating a Delta Table | 53 The following Delta Lake-specific attributes are also included: delta.minReaderVersion The minimum required protocol reader version for a reader that can read from this Delta table. delta.minWriterVersion The minimum required protocol writer version for a writer that can write to this Delta table. Please refer to the Delta Lake documentation for a full listing of all available table properties. Following is an example of the DESCRIBE TABLE EXTENDED command: %sql DESCRIBE TABLE EXTENDED taxtdb.rateCard; The generates the following output: +................................................................. +.................................................................. +.................. + | col_name | data_type | comment 1 | rateCodeld | int | 1 | rateCodeDesc | string | 1 i 1 i 1 i 1 11 | # Detailed Table Information | 1 1 | Catalog | hive_metastore 1 1 | Database | taxidb 1 1 | Table | ratecard 1 1 1 Type | EXTERNAL 1 1 | Location | dbfs:/.../chapter03/rateCard 1 1 | Provider | delta 1 1 | Owner | root 1 1 | Table Properties | [delta.minReaderVersion=l, 1 1 1 | delta.minWriterVersion=2] 1 1 +................................................................. +.................................................................. - +.................. + So far, we have covered the creation of Delta tables with the SQL DDL. In the next section, we will switch back to Python, and look at how you can use the familiar PySpark DataFrames to create new Delta tables. Creating Delta Tables with the DataFrameWriter API Spark DataFrames resemble relational database tables or Excel spreadsheets with headers. The data resides in rows and columns of different data types. The collection of functions that lets us read, write, and manipulate DataFrames is collectively known as the Spark DataFrameWriter API. 54 | Chapter 3: Bask Operations on Delta Tables Creating a managed table s When you read the Spark and/or Delta documentation, you will hear the terms managed and unmanaged table. A Delta table that is created with a location is known as an unmanaged table. For these tables, Spark only manages the metadata, and requires the user to specify the exact location where you wish to save the under lying data for the table, or alternatively, the source directory from which data will be pulled to create the table (if you are using the DataFrameWriter API). A Delta table that is created without a location is referred to as a managed table. Spark manages both the metadata and the actual data for managed tables. The data is stored under the /spark warehouse subfolder (in the Apache Spark scenario) or the /user! hive/warehouse folder (when running on Databricks), which is the default for managed tables. One of the benefits of the DataFrameWriter API is that you can simultaneously create a table and insert data into it from a Spark DataFrame, as shown in the following code snippet:2 INPUT_PATH = '/databricks-datasets/nyctaxi/taxizone/taxi_rate_code.csv' DELTALAKE_PATH = \ 'dbfs:/mnt/datalake/book/chapter03/createDeltaTableWithDataFrameWriter' # Read the DataFrame from the input path df_rate_codes = spark \.read \.format("csv") \.option("inferSchema",True) \.option("header", True) \.load(INPUT_PATH) # Save our DataFrame as a managed Hive table df_rate_codes.write.format("delta").saveAsTable(1taxidb.rateCard') Here, we first populate the DataFrame from the taxi_rate_code.csv file, and then save the DataFrame as a Delta table by specifying the.format("delta") option. The schema of the table will be the schema of our DataFrame. Notice that this will be a managed table since we did not specify a location for our data file. You can verify this by running the SQL DESCRIBE TABLE EXTENDED command: 2 GitHub repo location: /chapter03/04 - The DataFrameWriter API Creating a Delta Table | 55 %sql DESCRIBE TABLE EXTENDED taxtdb.rateCard; +................................................................. +...................................................................................................... - + | col_name I data_type 1 +................................................................. +...................................................................................................... - + | RateCodelD | int 1 | RateCodeDesc | string 1 I I 1 1 1 1 | # Detailed Table Information 1 1 | Catalog | hive_metastore 1 | Database | taxidb 1 | Table | ratecard 1 1 Type | MANAGED 1 | Location | dbfs:/user/hive/warehouse/taxidb.db/ratecard 1 | Provider | delta 1 | Owner | root 1 | Is_managed_location | true 1 | Table Properties | [delta.minReaderVersion=l, 1 1 | delta.minWriterVersion=2] 1 +................................................................. +...................................................................................................... - + We see that the data for the table lives in the /user/hive/warehouse location, and that the type of the table is set to MANAGED. If you run a SELECT on the table, you can see the data was indeed loaded successfully from the CSV file: %sql SELECT * FROM taxtdb.rateCard +-------- ------------ - +-----------..........-............ - + | RateCodelD | RateCodeDesc 1 1 1 | Standard Rate 1 1 2 | JFK 1 1 3 | Newark 1 1 4 | Nassau or Westchester 1 1 5 | Negotiated fare 1 1 6 | Group ride 1 +........................ +.................................... - + Creating an unmanaged table You can create an unmanaged table by specifying both the path and the name of the Delta table. In the following code, we execute both steps in sequence. First, drop the existing table: %sql -- Drop the existing table DROP TABLE IF EXISTS taxidb.rateCard; Next, write out and create the table: 56 | Chapter 3: Basic Operations on Delta Tables # Next, create our Delta table, specifying both # the path and the Delta table N=name df_rate_codes \.write \.format("delta") \.mode("overwrite") \.option('path', DELTALAKE_PATH) \.saveAsTable('taxidb.rateCard') Again by performing a simple SELECT we can verify that the data of the DataFrame has been loaded: %sql SELECT * FROM taxidb.rateCard +.................. +.................................... + 1 RateCodelD | RateCodeDesc 1 1 1 1 Standard Rate 1 1 2 1 JFK 1 1 3 1 Newark 1 1 4 1 Nassau or Westchester 1 1 5 1 Negotiated fare 1 1 6 1 Group ride 1 +......................... + + Creating a Delta Table with the DeltaTableBuilder API The last way to create a Delta table is by using the DeltaTableBuilder API. Since it is designed to work with Delta tables, it offers a higher degree of fine-grained control versus the traditional DataFrameWriter API. It is easier for a user to specify additional information such as column comments, table properties, and generated columns. The Builder design pattern is popular in software languages. The Builder pattern aims to “separate the construction of a complex object from its representation so that the same construction process can create different representations.” It is used to construct a complex object step-by-step, where the final step will return the object. The complex object we are building in this case is a Delta table. Delta tables support so many options that it is challenging to design a standard API with many arguments for a single function. Instead, the DeltaTableBuilder has a number of small meth ods, such as addColumnO, which all return a reference to the Builder object. That way we can keep adding other calls to addColumn(), or other methods of the Builder. The final method we call is execute(), which gathers up all the attributes received, creates the Delta table, and returns a reference to the table to the caller. To use the DeltaTableBuilder, we will need the following import: from delta.tables import * Creating a Delta Table | 57 This example creates a managed table:’ # In this Create Table, you do NOT specify a location, so you are # creating a MANAGED table DeltaTable.createIfNotExists(spark) \.tableName("taxidb.greenTaxis") \. addColumn("RideId", "INT", comment = "PrimaryKey") \.addColumn("VendorId", "INT", comment = "RideVendor") \.addColumn("EventType", "STRING") \.addColumn("PickupTime", "TIMESTAMP") \.addColumn("PickupLocationId", "INT") \.addColumn("CabLicense", "STRING") \.addColumn("DriversLicense", "STRING") \.addColumn("PassengerCount", "INT") \.addColumn("DropTime", "TIMESTAMP") \.addColumn("DropLocationId", "INT") \.addColumn("RateCodeId", "INT", comment ="Ref toRateCard") \.addColumn("PaymentType", "INT") \.addColumn("TripDistance", "DOUBLE") \.addColumn("TotalAmount", "DOUBLE") \.execute() Since each method returns a reference to the Builder object, we can keep call ing. addColumn() to add each column. Finally, we call.execute() to create the Delta table. Generated Columns Delta Lake supports generated columns, which are a special type of column, the values of which are automatically generated based on a user-specified function over other columns in the Delta table. When you write to a Delta table with generated columns and don’t explicitly provide values for them, Delta Lake automatically computes the values. Let’s create an example next. To stay with our taxi theme, we will create a simple version of a yellow taxi table: %sql CREATE TABLE taxidb.YellowTaxis ( Rideld INT COMMENT 'This is our primary Key column', Vendorld INT, PickupTime TIMESTAMP, PickupYear INT GENERATED ALWAYS AS(YEAR (PickupTime)), PickupMonth INT GENERATED ALWAYS AS(MONTH (PickupTime)), PickupDay INT GENERATED ALWAYS AS(DAY (PickupTime)), DropTime TIMESTAMP, CabNumber STRING COMMENT 'Official Yellow Cab Number' 3 GitHub repo location: /chapter03/05 - The DeltaTableBuilder API 58 | Chapter 3: Basic Operations on Delta Tables ) USING DELTA LOCATION "/nnt/datalake/book/chapter03/YellowTaxts.delta" COMMENT 'Table to store Yellow Taxi data' We see the columns with GENERATED ALWAYS AS, which extracts the YEAR, MONTH, and DAY from the PickupTime column. The values for these columns will automatically be populated when we insert a record: %sql INSERT INTO taxidb.YellowTaxts (Rideld, Vendorld, PickupTime, DropTime, CabNumber) VALUES (5, 101, '2021-7-1T8:43:28UTC+31, '2021-7-1T8:43:28UTC+3', '51-986') When we select the record, we see that the generated columns are automatically populated: %sql SELECT PickupTime, PickupYear, PickupMonth, PickupDay FROM taxidb.YellowTaxis +........................................................... H F......................... +........................... +...................... - + 1 pickupTime | pickupYear | pickupMonth | pickupDay 1 1 2021-07-01 05:43:28+00:00 | 2021 1 7 1 1 1 +........................................................... H F......................... +...................... +.................. + When we do explicitly provide a value for a generated column, the value must satisfy the constraint ( generation expression) IS TRUE or the insert will fail with an error. The expression you use in GENERATED ALWAYS AS can be any Spark SQL function that always returns the same result when given the same argument values, with a few exceptions we will touch on soon. You might think you could use a GENERATED column to generate a column with a unique ID like this: %sql CREATE OR REPLACE TABLE default.dummy ( ID STRING GENERATED ALWAYS AS (UUIDQ), Name STRING ) USING DELTA However, when you try to run this, you get the following error message: Found uuid(). A generated column cannot use a non deterministic expression. The UUID() function will return a different value for each invocation, which violates the preceding rule. There are a few exceptions to this rule for the following types of functions: User-defined functions Creating a Delta Table | 59 Aggregate functions Window functions Functions returning multiple rows GENERATED ALWAYS AS columns using the functions listed are valid, and can be very useful in several scenarios, like calculating a standard deviation of a given sample of records. Reading a Delta Table We have a few options when reading a table: SQL and PySpark using the DataFrameReader. When we use a notebook in the Databricks Community Edition, we tend to use both SQL and PySpark cells within the notebook. Some things, like a quick SELECT, are just easier and faster to do in SQL, while complex operations are sometimes easier expressed in PySpark and the DataFrameReader. This is of course also dependent on the experience and preferences of the engineer. We recommend a pragmatic approach using a healthy mix of both, depending on the problem you are currently solving. Reading a Delta Table with SQL To read a Delta table, we can simply open a SQL cell and write your SQL query. If we set up your environment as specified in the GitHub READ.ME file, we will have a Delta table in the /mnt/datalake/book/chapter03/YellowTaxisDelta folder: %sh Is -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDelta total 236955 drwxrwxrwx 2 root root 4096 Dec 4 18:04. drwxrwxrwx 2 root root 4096 Dec 2 19:02.. drwxrwxrwx 2 root root 4096 Dec 4 16:41 _delta_log -rwxrwxrwx 1 root root 134759123 Dec 4 18:04 part-00000-...-C000.snappy.parquet -rwxrwxrwx 1 root root 107869302 Dec 4 18:04 part-00001-...-c000.snappy.parquet We can quickly register a Delta table location in the metastore, as follows:4 %sql CREATE TABLE taxidb.YellowTaxis USING DELTA LOCATION "/^nt/datalake/book/chapter03/YellowTaxisDelta/" Once we have created the table, we can do a quick count on the number of records: %sql SELECT 4 GitHub repo location: /chapter03/07 - Read Table with SQL 60 | Chapter 3: Basic Operations on Delta Tables COUNT(*) FROM taxidb. yellowtaxis This gives us the following count: +..................... + | count(l) | +..................... + | 9999995 | +..................... + We can see there are almost 10 million rows to work with. We can use another DESCRIBE command variant to get the details of the table: %sql DESCRIBE TABLE FORMATTED taxidb.YellowTaxis; DESCRIBE TABLE FORMATTED formats the output, making it a bit more readable: +................................................................. +.................................................................................... - + | col_name | data_type 1 | Rideld | int 1 | Vendorld | int 1 | PickupTime | timestamp 1 | DropTime | timestamp 1 | PickupLocationld | int 1 | DropLocationld | int 1 | CabNumber | string 1 | DriverLicenseNumber | string 1 | PassengerCount | int 1 | TripDistance | double 1 | Ratecodeld | int 1 | PaymentType | int 1 | TotalAmount | double 1 | FareAmount | double 1 | Extra | double 1 | MtaTax | double 1 | TipAmount | double 1 | TollsAmount | double 1 | Improvementsurcharge | double 1 1 1 1 1 11 | # Detailed Table Information 1 1 | Catalog | hive_metastore 1 | Database | taxidb 1 | Table | YellowTaxis 1 1 Type | EXTERNAL 1 | Location | dbfs:/.../chapter03/YellowTaxisDelta 1 | Provider | delta 1 | Owner | root 1 | Table Properties | [delta.minReaderVersion=l, 1 1 | delta.minWriterVersion=2] 1 +................................................................. +.................................................................................... - + Reading a Delta Table | 61 Because Spark SQL supports most of the ANSI SQL subset, we can use any type of complex query. Following is an example that returns CabNumbers with the most expensive FareAmounts over $50: %sql SELECT CabNumber, AVG(FareAmount) AS AverageFare FROM taxidb.yellowtaxis GROUP BY CabNumber HAVING AVG(FareAmount) > 50 ORDER BY 2 DESC LIMIT 5 This gives us: +....................... +............................. + | cabnumber | AverageFare | SIR104 | 111.5 T628190C | 109.0 PEACE16 | 89.7 T439972C | 89.5 T802013C | 85.0 -L _ ________ - We can also use SQL directly in Python with spark.sql, using standard SQL as the argument. Following is a simple Python snippet that performs the same query as the previous SQL query: number_of_results = 5 sql.statement = f,””, SELECT CabNumber, AVG(FareAmount) AS AverageFare FROM taxidb.yellowtaxis GROUP BY CabNumber HAVING AVG(FareAmount) > 50 ORDER BY 2 DESC LIMIT {number_of_results}""" df = spark.sql(sql_statement) display(df) This produces the same results as the SQL: 62 | Chapter 3: Basic Operations on Delta Tables +...................... + -+ 1 cabnumber 1 AverageFare 1 1 SIR104 1 111.5 1 1 T628190C 1 109.0 1 1 PEACE16 1 89.7 1 1 T439972C 1 89.5 1 1 T802O13C 1 85.0 1 + + + We recommend using the triple-quotes syntax, which makes it easy to span strings over multiple lines without having to use continuation lines. Also, notice how we have the variable nupiber_of_results, and then convert the triple-quote string into an f-string and use the {} syntax to insert the variable for the limit. Reading a Table with PySpark To read the same table in PySpark, you can use the DataFrameReader. For example, to implement the count of records, we use:5 df = spark, read. format("delta").table("taxidb.YellowTaxis") print(f"Number of records: {df.count():,}") Output: Number of records: 9,999,995 Note that we specify the Delta format, since our table is a Delta table and we can use the. table() method to specify that we want to read the entire table. Finally, we use an f-string, this time with the formatter, which uses a comma separator for every three digits. Next, lets re-create the code for the top five average fares by cab number, which we did in SQL earlier. The Python code follows: # Make sure to import the functions you want to use from pyspark.sql.functions import col, avg, desc # Read YellowTaxis into our DataFrame df = spark.read.format("delta").table("taxidb.YellowTaxis") # Perform the GROUP BY, average (AVG), HAVING and order by equivalents # in pySpark results = df.groupBy("CabNumber") \.agg(avg("FareAmount").alias("AverageFare")) \.filter(col("AverageFare") > 50) \.sort(col("AverageFare").desc()) \.take(5) 5 GitHub repo location: /chapterO3/Read Table with PySpark Reading a Delta Table | 63 # Print out the result, since this is a list and not a DataFrame # you an use list comprehension to output the results in a single # line [print(result) for result in results] We’ll get the following the output: Row(CabNumber='SIR104', AverageFare=lll.5) Row(CabNumber='T628190C', AverageFare=109.0) Row(CabNumber='PEACE16', AverageFare=89.7) Row(CabNumber='T439972C', AverageFare=89.5) Row(CabNumber='T802013C', AverageFare=85.0) We can simply use the groupBy() function to group by a column: s Note that the result of this is no longer a DataFrame, but a pyspark.sql.GroupedData instance, as illustrated in this code snippet: # Perform a groupBy, and print out the type print(type(df.groupBy("CabNumber"))) This prints out: Often, a developer new to PySpark might assume that groupBy() returns a DataFrame, but it returns a GroupedData instance, so you have to use GroupedData methods such as agg() and filter() instead of DataFrame functions such as avg() and where(). To calculate an average, we first have to use the.agg() method. Within the method we can specify which aggregate you want to calculate, which in this case is.avg() (average). In Python, the equivalent of the HAVING condition is the.filter() method, within which we can specify the filter using a filter expression. Finally, we use the.sort() method to sort the data, and then use.take() to extract the first five results. Note that the.take() function will return a Python list. Since we have a list here, we can use list comprehension to output each result in the list. Writing to a Delta Table There are various ways to write to a Delta table. You might want to rewrite an entire table, or you might want to append to a table. The more advanced topics, such as updates and merges, will be discussed in Chapter 4. We first will clean out our YellowTaxis table, so that we have a clean slate, and then we will use a traditional SQL INSERT statement to insert data. Next, we will append the data from a smaller CSV file. We will also take a quick look at the overwrite mode 64 | Chapter 3: Bask Operations on Delta Tables when writing a Delta table, and finally we will use the SQL COPY INTO feature to merge in a large CSV file. Cleaning Out the YellowTaxis Table We can re-create our Delta table with a CREATE TABLE statement:6 %sql CREATE TABLE taxidb.YellowTaxis ( Rideld INT, Vendorld INT, PickupTime TIMESTAMP DropTime TIMESTAMP PickupLocationld INT, DropLocationld INT, CabNumber STRING, DriverLicenseNumber STRING, PassengerCount INT, TripDistance DOUBLE, Ratecodeld INT, PaymentType INT, Total-Amount DOUBLE, FareAmount DOUBLE, Extra DOUBLE, MtaTax DOUBLE, TipAmount DOUBLE, TollsAmount DOUBLE, Improvementsurcharge DOUBLE ) USING DELTA LOCATION "/r'int/clataLake/book/chapter03/YeLLowTaxisDelta" With the table set up, we are ready to insert data. Inserting Data with SQL INSERT To insert a record into the YellowTaxis Delta table, we can use the SQL INSERT command: %sql INSERT INTO taxidb.yellowtaxis (Rideld, Vendorld, PickupTime, DropTime, PickupLocationld, DropLocationld, CabNumber, DriverLicenseNumber, PassengerCount, TripDistance, Ratecodeld, PaymentType, TotaiAmount, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, Improvementsurcharge) 6 GitHub repo location: /chapter03/10 - Writing to a Delta Table Writing to a Delta Table | 65 VALUES(9999995, 1, ’2019-11-01T0O:G0:0O.000Z', 12019-11-01T00:02:23.573Z' , 65, 71, 'TAC304', '453987', 2, 4.5, 1, 1, 20.34, 15.0, 0.5, 0.4, 2.0, 2.0, 1.1) This will insert one row: +- - -+— - - -+ 1 num_affected_rows | num__inserted_rows | 1 1 1 1 1 +- - -+— —+ Verify the data has loaded correctly with a SQL SELECT statement and WHERE clause for the inserted Rideld: %sql SELECT count(Rideld) AS count FROM taxidb.YellowTaxis WHERE Rideld = 9999995 Output: +.............. + | count | +.............. + I 1 I +.............. + The output shows that all data has been loaded correctly. Appending a DataFrame to a Table Now let’s append a DataFrame to our table. In this case we will load the DataFrame from a CSV file. In order to correctly load the data, we don’t want to infer the schema. Instead we will use the schema of the YellowTaxis table that we know is correct. We can easily extract the schema by loading up a DataFrame from the table: df = spark.read.format("delta").table("taxidb.YellowTaxis") yellowTaxiSchema = df.schema print(yellowTaxiSchema) This shows the table schema is as follows: root |-- Rideld: integer (nullable = true) |-- Vendorld: integer (nullable = true) |-- PickupTime: timestamp (nullable = true) |-- DropTime: timestamp (nullable = true) |-- PickupLocationld: integer (nullable = true) |-- DropLocationld: integer (nullable = true) |-- CabNumber: string (nullable = true) |-- DriverLicenseNumber: string (nullable = true) 66 | Chapter 3: Basic Operations on Delta Tables |-- PassengerCount: integer (nullable = true) |-- TripDistance: double (nullable = true) |-- Ratecodeld: integer (nullable = true) |-- PaymentType: integer (nullable = true) |-- TotalAmount: double (nullable = true) |-- FareAmount: double (nullable = true) |-- Extra: double (nullable = true) |-- MtaTax: double (nullable = true) |-- TipAmount: double (nullable = true) |-- TollsAmount: double (nullable = true) |-- Improvementsurcharge: double (nullable = true) Now that we have the schema, we can load a new DataFrame (df_for_append) from the appended CSV file: df_for_append = spark.read \.option("header", "true") \.schema(yellowTaxiSchema) \.csv("/mnt/datalake/book/data files/YellowTaxis_append.csv") display(df_for_append) We see the following output (partial output is displayed): +................................. +............................... +.................................................................4-.......................................................................... + | Rideld | Vendorld | PickupTime | DropTime | +................................. +............................... +.................................................................+.......................................................................... + | 9999996 |1 | 2019-01-01T0O:00:00 | 2022-03-01T00:13:13 | +................................. +............................... +.................................................................+.......................................................................... + | 9999997 |1 | 2019-01-01T0O:00:00 | 2022-03-01T00:09:21 | +................................. +............................... +.................................................................+.......................................................................... + | 9999998 |1 | 2019-01-01T0O:00:00 | 2022-03-01T00:09:15 | +................................. +............................... +.................................................................+.......................................................................... + | 9999999 |1 | 2019-01-01TOO:00:00 | 2022-03-01T00:10:01 | +................................. +............................... +.................................................................+.......................................................................... + We now have four additional rows, all with a Vendorld of 1. We can now append this CSV file to the Delta table: df_for_append.write \.mode("append") \.format("delta") \.save("/mnt/datalake/book/chapter03/YellowTaxisDelta") This appends the data directly to the Delta table. Since we had one row in the table before from the INSERT statement and we inserted four additional rows, we know that we should now have five rows in the YellowTaxis table: Writing to a Delta Table | 67 %sql SELECT COUNT(*) FROM taxidb.YellowTaxis +...................... + |count(l) | +...................... + I 5 | +...................... + We now have five rows. Using the OverWrite Mode When Writing to a Delta Table In the previous example we used.mode("append") when using the DataFrameWriter API to write to a Delta table. Delta Lake also supports the overwrite mode when writing to a Delta table. When you use this mode you will atomically replace all of the data in the table. If we had used. mode( "overwrite") in the previous code block, we would have over written the entire YellowTaxis Delta table with just the df_for_append DataFrame. Even if you use.mode("overwrite") in your code, the old part files are not immedi ately physically deleted. In order to support features such as time travel, these files cannot be deleted immediately. We can use commands such as VACUUM to physically delete these files later when we are sure they are no longer needed. Time travel and the VACUUM command are covered in Chapter 6. Inserting Data with the SQL COPY INTO Command We can use the SQL COPY INTO command to append data to our table. This com mand is especially useful when we need to quickly append very large amounts of data. We can use the following command to append the data from a CSV file: %sql COPY INTO taxidb.yellowtaxis FROM ( SELECT Rideld::Int , Vendorld::Int , PickupTime::Timestamp , DropTime::Timestamp , PickupLocationld::Int , DropLocationld::Int , CabNumber::String , DriverLicenseNumber::String , PassengerCount::Int , TripDistance::Double , RateCodeld::Int 68 | Chapter 3: Basic Operations on Delta Tables , PaymentType::Int , Total-Amount::Double , FareAmount::Double , Extra::Double , MtaTax::Double , TipAmount::Double , TollsAmount::Double , InprovementSurcharge::Double FROM '/mnt/datalake/book/DataFiles/YellowTaxisLargeAppend.csv' ) FILEFORMAT = CSV FORMAT-OPTIONS ("header" = "true") All fields in a CSV file would be strings, so we need to provide some type of schema with a SQL SELECT statement when we load the data. This provides the type of each column, ensuring that we are loading the right schema. Note that the FILEFORMAT, in this case CSV, is specified. Finally, because our file has a header, we need to specify the header with FORMAT-OPTIONS. The output of this statement is: -+- num_affected_rows 1 num_tnserted_rows 9999995 1 9999995 You can see that we inserted almost 10 million rows in just a few seconds. The COPY INTO command also keeps track of and will not reload any previously loaded files. We can test this by running the COPY INTO command again: num_affected_rows 1 num_inserted_rows 0 1 0 As you can see, no additional rows were loaded. Finally, when we check the final row count, we will see that we now have one million rows: %sql SELECT COUNT(*) FROM taxidb.YellowTaxis Using the OverWrite Mode When Writing to a Delta Table | 69 Output: +.....................+ | count(l) | +.................... + | 10000000 | +.................... + Partitions Delta tables are often accessed with a standard query pattern. For example, data from IoT systems tends to be accessed by day, hour, or even minute. The analysts querying the yellow taxi data might want to access the data by Vendorld and so on. These use cases lend themselves well to partitioning. Partitioning your data to align with your query patterns can dramatically speed up query performance, especially when combined with other performance optimizations, such as Z-ordering.7 A Delta table partition is composed of a folder with a subset of data rows that share the same value for one or more column(s). s Note that this type of on-disk partitioning should not be confused with the partitioning that Spark applies when processing a Data- Frame. Spark applies in-memory partitioning to enable tasks to run in parallel and independently on a large number of nodes in a Spark cluster. For example, for the yellow taxi data, the partitioning column could be Vendorld. After partitioning your table, individual folders will be created for each Vendorld. The last part of the folder name will have VendorId=XX: drwxrwxrwx 2 root root 4096 Dec 13 15:16 Vendorld=l drwxrwxrwx 2 root root 4096 Dec 13 15:16 Vendorld=2 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Vendorld=4 Once the table is partitioned, all queries with predicates that include the partition columns will run much faster, since Spark can immediately select the folder with the correct partition. You can partition data when you create a Delta table by specifying a PARTITIONED BY clause. 7 Z-ordering is covered in Chapter 5. 70 | Chapter 3: Basic Operations on Delta Tables s At the time of writing, partitions are the recommended approach to align data to your query patterns to increase query performance. A new feature in Delta Lake called liquid, clustering is currently in preview, which you will learn about in Chapter 5. We felt that it was important for readers to understand how partitions work and how you can apply them manually before learning about features that automate and replace these commands. The new feature, liquid clustering, will be generally available in the near future. You can learn more and stay up-to-date on the status of liquid clustering at the Delta Lake documentation website and this feature request. Partitioning by a single column Lets take our YellowTaxis table and create a new version that is partitioned by Vendorld. First, create the partitioned table:8 %sql CREATE TABLE taxidb.YellowTaxisPartitioned ( Rideld INT, Vendorld INT, PickupTime TIMESTAMP DropTime TIMESTAMP PickupLocationld INT, DropLocationld INT, CabNumber STRING, DriverLicenseNumber STRING, PassengerCount INT, TripDistance DOUBLE, Ratecodeld INT, PaymentType INT, TotaiAmount DOUBLE, FareAmount DOUBLE, Extra DOUBLE, MtaTax DOUBLE, TipAmount DOUBLE, ToiisAmount DOUBLE, ImprovementSurcharge DOUBLE ) USING DELTA PARTITIONED BY(Vendorld) LOCATION "/mnt/datalake/book/chapter03/YeiiowTaxisDeltaPartitioned" Notice the PARTITIONED BY(Vendorld) clause. Now that you have your table, you will load the data from our old YeLlowTaxis table, and write that data to the new table. First, read the data with the DataFrameReader: 8 GitHub repo location: /chapter03/l 1 - Partitions Using the OverWrite Mode When Writing to a Delta Table | 71 input_df = spark, read, format ("delta"). table("taxidb.YellowTaxis") Next, use the DataFrameWriter to write the data to the partitioned Delta table: input_df \.write \.format("delta") \.mode("overwrite") \.save("forit/datalake/book/chapter03/YellowTaxisDeltaPartitioned") Now when we look at the tables directory, we’ll see a subdirectory for every Vendor ID: %sh is -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned drwxrwxrwx 2 root root 4096 Dec 5 17:39 drwxrwxrwx 2 root root 4096 Dec 2 19:02 drwxrwxrwx 2 root root 4096 Dec 5 16:44 Vendorld=l drwxrwxrwx 2 root root 4096 Dec 5 16:44 Vendorld=2 drwxrwxrwx 2 root root 4096 Dec 5 16:44 Vendorld=4 drwxrwxrwx 2 root root 4096 Dec 5 16:44 _delta_log When we look at the distinct Vendorld, we see that you indeed only have those three IDs: %sql SELECT DISTINCT(Vendorld) FROM taxidb.YellowTaxisPartitioned; We will see the same IDs: +...................+ | Vendorld | +...................+ I 2 | I 1 I I 4 | +............... + The Vendorld subdirectories contain the individual Parquet files, as shown here for Vendorld=4: %sh Is -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned/VendorId=4 total 3378 drwxrwxrwx 2 root root 4096 Dec 5 17:41 drwxrwxrwx 2 root root 4096 Dec 5 17:39.. -rwxrwxrwx 1 root root 627551 Dec 5 17:41 part-00000-...parquet -rwxrwxrwx 1 root root 618844 Dec 5 17:41 part-00001-...parquet -rwxrwxrwx 1 root root 616377 Dec 5 17:41 part-00002-...parquet -rwxrwxrwx 1 root root 614035 Dec 5 17:41 part-00003-...parquet -rwxrwxrwx 1 root root 612410 Dec 5 17:41 part-00004-...parquet -rwxrwxrwx 1 root root 360432 Dec 5 17:41 part-00005-...parquet 72 | Chapter 3: Basic Operations on Delta Tables Partitioning by multiple columns You don’t have to partition by just one column. We can use multiple hierarchical columns as partitioning columns. For example, for IoT data, we might want to partition by day, hour, and minute, because that is the most commonly used query pattern. For example, let’s assume that we would not only want our YellowTaxis table parti tioned by Vendor Id, but also by RateCodeld. First, we would have to drop the existing YellowTaxtsPartitioned table and its underlying files. Next, we can re-create the table: %sql -- Create the table CREATE TABLE taxidb.YellowTaxisPartitioned ( Rideld INT, ) USING DELTA PARTITIONED BY(VendorId, Ratecodeld) -- Partition by Vendorld AND rateCodeld LOCATION "/rint/datalake/book/chapter03/YellowTaxisDeltaPartitioned" Notice the updated partition clause: PARTITIONED BY(VendorId, Ratecodeld). After this, we can reload the table the same way we did before. Once the table is loaded, we can take another look at the directory structure. The first level still looks the same: %sh Is -al /dbfs/mnt/datalake/book/chapter03/YellowTaxtsDeltaPartttioned drwxrwxrwx 2 root root 4096 Dec 13 15:33 drwxrwxrwx 2 root root 4096 Dec 2 19:02 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Vendorld=l drwxrwxrwx 2 root root 4096 Dec 13 15:16 Vendorld=2 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Vendorld=4 drwxrwxrwx 2 root root 4096 Dec 13 15:16 _delta_log When we take a look at the Vendorld=l directory, we see the partitioning by Ratecodeld: %sh Is -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned/VendorId=l drwxrwxrwx 2 root root 4096 Dec 13 15:35 drwxrwxrwx 2 root root 4096 Dec 13 15:33 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Ratecodeld=l drwxrwxrwx 2 root root 4096 Dec 13 15:16 Ratecodeld=2 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Ratecodeld=3 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Ratecodeld=4 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Ratecodeld=5 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Ratecodeld=6 drwxrwxrwx 2 root root 4096 Dec 13 15:16 Ratecodeld=99 Using the OverWrite Mode When Writing to a Delta Table | 73 Finally, when we query at the Ratecodeld level: %sh Is -al /dbfs/.../chapter03/YellowTaxisDeltaPartitioned/VendorId=l/RatecodeId=l We can see the Parquet files for that partition: drwxrwxrwx 2 root root 4096 Dec 13 15:35 drwxrwxrwx 2 root root 4096 Dec 13 15:35.. -rwxrwxrwx 1 root root 10621353 Dec 13 15:35 part-00000-...parquet -rwxrwxrwx 1 root root 10547673 Dec 13 15:35 part-00001-...parquet -rwxrwxrwx 1 root root 10566377 Dec 13 15:35 part-00002-...parquet -rwxrwxrwx 1 root root 10597523 Dec 13 15:35 part-00003-...parquet -rwxrwxrwx 1 root root 10570937 Dec 13 15:35 part-00004-...parquet -rwxrwxrwx 1 root root 6119491 Dec 13 15:35 part-00005-...parquet -rwxrwxrwx 1 root root 13820133 Dec 13 15:35 part-00007-...parquet -rwxrwxrwx 1 root root 24076060 Dec 13 15:35 part-00008-...parquet -rwxrwxrwx 1 root root 6772609 Dec 13 15:35 part-00009-. ,.parquet While this type of partitioning by multiple columns is supported, we want to point out some pitfalls. The number of files created will be the product of the cardinality of both columns, so in this case the number of vendors times the number of rate cards. This can lead to the “small file problem” where a large number of small Parquet part files are created. Sometimes other solutions, such as Z-ordering, can be more effec tive than partitioning. Chapter 5 covers performance tuning and this topic in greater detail. Checking if a partition exists To determine whether a table contains a specific partition, you can use the statement: SELECT COUNT(*) > 0 FROM WHERE = If the partition exists, true is returned. The following SQL statement checks if the partition for Vendorld = 1 and Ratecodeld = 99 exists: %sql SELECT C0UNT(*) > 0 AS 'Partition exists' FROM taxidb.VellowTaxisPartitioned WHERE Vendorld = 2 AND RateCodeld = 99 This will return true since this partition exists as was shown earlier. 74 | Chapter 3: Basic Operations on Delta Tables Selectively updating Delta partitions with replaceWhere In the previous section, we saw how we can significantly speed up query operations by partitioning data. We can also selectively update one or more partitions with the replaceWhere option. Selectively applying updates to certain partitions is not always possible; some updates need to apply to the entire data lake. But, when applicable, these selective updates can result in significant speed gains. Delta Lake can update partitions with excellent performance, while at the same time guaranteeing data integrity. To see replaceWhere in action, let’s take a look at a particular partition: %sql SELECT Rideld, Vendorld, PayrnentType FROM taxidb.yellowtaxispartitioned WHERE VendorlD = 1 AND Ratecodeld = 99 LIMIT 5 We see a mixture of payment types in the results: + + +........................ + | Rideld | Vendorld | PayrnentType | 1137733 |1 1 1 1 1144423 |1 1 1 2 1214030 1 1 1 1 1223028 1 1 1 1 1300054 1 1 1 2 Lets assume that we have a business reason that states that all PaymentTypes for Vendorld = 1 and Ratecodeld = 9 should be 3. We can use the following PySpark expression with replaceWhere to achieve that result: from pyspark.sql.functions import * spark.read \.format("delta") \.load("/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned") \.where((col("Vendorld") == 1) & (col("RatecodeId") == 99)) \.withColumn("PayrnentType", lit(3)) \.write \.formatC'delta") \.option("replaceWhere", "Vendorld = 1 AND RateCodeld = 99") \.mode("overwrite") \.save("/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned") Using the OverWrite Mode When Writing to a Delta Table | 75 When we now look for the distinct PaymentTypes for this partition: %sql SELECT DISTINCT(PaymentType) FROM taxidb.yellowtaxispartitioned WHERE VendorlD = 1 AND Ratecodeld = 99 We see that we only have PaymentType = 3: +.........................+ | PaymentType | +.........................+ I 3 | +.........................+ We can verify that the other partitions are not affected: %sql SELECT DISTINCT(PaymentType) FROM taxidb.yellowtaxispartitioned ORDER BY PaymentType This shows all PaymentTypes: +.........................+ | PaymentType | +........................ + I 1 I I 2 | I 3 | I 4 | +........................ + replaceWhere can be particularly useful when you have to run an operation that can be computationally expensive, but you only need to run it on certain partitions. In the yellow taxi scenario, lets assume that the data science team has requested that you run one of their algorithms on the YellowTaxis table. Initially, you can run it on your smallest partition and quickly retrieve the results and, when approved, run the algorithm on all remaining partitions overnight. User-Defined Metadata For auditing or regulatory purposes, we might want to add a tag to certain SQL operations. For example, our project might require that you tag INSERTS to certain tables with a General Data Protection Regulation (GDPR) tag. Once we tag the 76 | Chapter 3: Basic Operations on Delta Tables INSERT with this tag, the auditing tool will be able to generate a complete list of statements that contain this particular tag. We can specify these tags as user-defined strings in metadata commits made by # 20 a SQL operation. We can do this either by using the DataFrameWriter’s option userMetadata, or the SparkSession configuration spark. databricks.delta. commit # 21 Info.userMetadata. If both options are specified, the DataFrameWriter’s option takes precedence. Using SparkSession to Set Custom Metadata Lets look at the SparkSession configuration first. Assume that we have an INSERT operation, to which we want to assign a GDPR tag for auditing purposes. Following is a SQL example: %sql SET spark.databricks.delta.commitinfo.userMetadata=my-custom-metadata= { "GDPR”: "INSERT Request 1x965383" }; This tag will apply to the next operation, which is a standard INSERT: INSERT INTO taxidb.yellowtaxisPartitioned (Rideld, Vendorld, PickupTime, DropTime, PickupLocationld, DropLocationld, CabNumber, DriverLicenseNumber, PassengerCount, TripDistance, Ratecodeld, PaymentType, TotalAmount, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, Improvementsurcharge) VALUES(10000000, 3, '2019-11-01T00:GG:00.000Z', '2019-11-01T00:02:23.573Z', 65, 71, ’TAC304', '453987', 2, 4.5, 1, 1, 20.34, 15.0, 0.5, 0.4, 2.0, 2.0, 1.1) Note that there is nothing special in the INSERT; it is a standard operation. The GDPR tag will automatically be applied to the commit info in the transaction log. If we search the transaction log for the latest.json file, we’ll see that 00004.json is the last log entry: %sh Is -al /dbfs/.../YellowTaxisDeltaPartitioned/_delta_log/*.json Output: -rwxrwxrwx 1.../_delta_log/000000OG00O000000G0G.json -rwxrwxrwx 1.../_delta_log/000000OO00000000OO01.json -rwxrwxrwx 1.../_delta_log/000000O000000000OO02.json -rwxrwxrwx 1.../_delta_log/0000000O000000000003.json -rwxrwxrwx 1.../_delta_log/00000000000000000004.json User-Defined Metadata | 77 When we look at the 00004.json commit file, we can see the GDPR entry: %sh grep commit /dbfs/.../YellowTaxisDeltaPartitioned/_delta_log/...00004.json > /tmp/commit.json python -m json.tool /tmp/commit.json This is the GDPR entry: { "commitinfo": {...., "notebook": { "notebookid": "1106853862465676" }, "clusterld”: "0605-014705-r8puunyx", "readversion": 3, "isolationLevel": "WriteSerializable", "isBlindAppend": true, "operationMetrics": { }, "userMetadata": " my-custom-pietadata= { \"GDPR\": \"INSERT Request lx965383\" }", "engineinfo": "Databricks-Runttme/10.4.x-scala2. 12", "txnld": "99f2f31c-8c01-4ea0-9e23-c0cbae9eb82a" } } s The SET statement will stay in effect for subsequent operations within your current Spark session, so if you want to continue inserting data without adding the GDPR metadata, you need to update the SET to an empty string or use the RESET operation. Be aware that RESET will reset all Spark properties, not just the metadata one! Using the DataFrameWriter to Set Custom Metadata We can also use DataFrameWriter with the userMetadata option to insert custom tags, as shown here: df_for_append.write \.mode("append") \.format("delta") \.option("userMetadata", '{"PII": "Confidential XYZ"}') \.save("/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned") 78 | Chapter 3: Basic Operations on Delta Tables When we look at the corresponding JSON entry, we will see the tags in commit Info: %sh grep commit /dbfs/.../YellowTaxisDeltaPartitioned/_delta_log/...O0G05.json > /tmp/commit.json python -m json.tool /tmp/commit.json { "commitinfo": { "userMetadata": "{\"PII\": \"Confidential XYZ\"}", } } Condusion This chapter reviewed the fundamentals for using Delta Lake by discussing the basic operations of Delta tables. Delta Lake provides a number of different ways to perform different types of operations using different types of APIs. For example, you can create Delta tables using SQL DDL, the DataFrameWriter API, or the DeltaTabLe Builder API, each of which has its own set of features and syntax. And when you create tables, you can specify a specific location to write the underlying data to create unmanaged tables, or you can let Spark manage both the metadata and underlying data by creating managed tables. Once a table has been created, you can then read and write to the table using the various APIs mentioned here. This chapter primarily covered different ways to insert, append, or overwrite data using SQL or the DataFrame API, as more sophisticated write operations (e.g., MERGE) will be covered in subsequent chapters. We also explored the capabilities of Delta Lake partitioning on disk. Whether parti tioning on a single column or multiple, Delta tables provide simple methods for partitioning tables that allow us to achieve significant data processing improvements and efficiency. And not only can we partition tables, but powerful, built-in Delta Lake features such as replaceWhere allow us to selectively apply updates to certain partitions in order to apply updates faster and more efficiently. Lastly, we learned that you can add user-defined metadata to Delta tables to aid # 22 in search and discovery, which can be particularly useful for auditing or regulatory purposes. Custom metadata allows us to compile a list of statements or operations on Delta tables that contain particular tags. Having dipped our toes into Delta Lake and Delta table basic operations, the follow ing chapter will dive deeper into more sophisticated Delta table DML operations. Condusion | 79