Ch 2 Getting_Started.pdf
Document Details
Uploaded by EnrapturedElf
Tags
Full Transcript
CHAPTER 2 Getting Started with Delta Lake In the previous chapter we introduced Delta Lake and saw how it adds transactional guarantees, DML support, auditing, a unified streaming and batch model, schema enforcement, and a scalable metadata model to traditional data lakes. In this chapter, we will g...
CHAPTER 2 Getting Started with Delta Lake In the previous chapter we introduced Delta Lake and saw how it adds transactional guarantees, DML support, auditing, a unified streaming and batch model, schema enforcement, and a scalable metadata model to traditional data lakes. In this chapter, we will go hands-on with Delta Lake. We will first set up Delta Lake on a local machine with Spark installed. We will run Delta Lake samples in two interactive shells: 1. First, we will run the PySpark interactive shell with the Delta Lake packages. This will allow us to type in and run a simple two-line Python program that creates a Delta table. 2. Next, we will run a similar program with the Spark Scala shell. Although we do not cover the Scala language extensively in this book, we want to demonstrate that both the Spark shell and Scala are options with Delta Lake. Next, we will create a helloDeltaLake starter program in Python inside your favorite editor and run the program interactively in the PySpark shell. The environment we set up in this chapter, and the helloDeltaLake program, will be the basis for most other programs we create in this book. Once the environment is up and running, we are ready to look deeper into the Delta table format. Since Delta Lake uses Parquet as the underlying storage medium, we first take a brief look at the Parquet format. Since partitions and partition files play an important role when we study the transaction log later, we will study the mechanism of both automatic and manual partitioning. Next, we move on to Delta tables and investigate how a Delta table adds a transaction log in the _delta_log directory. The remainder of this chapter is dedicated to the transaction log. We will create and run several Python programs to investigate the details of transaction log entries, 25 what kind of actions are recorded, and what Parquet data files are written when and how they relate to the transaction log entries. We will look at more complex update examples and their impact on the transaction log. Finally, we will introduce the concept of checkpoint files and how they help Delta Lake implement a scalable metadata system. Getting a Standard Spark Image Setting up Spark on a local machine can be daunting. You have to adjust many different settings, update packages, and so on. Therefore, we chose to use a Docker container. If you do not have Docker installed, you can download it free from their website. The specific container that we used was the standard Apache Spark image. To download the image, you can use the following command: docker pull apache/spark Once you have pulled down the image, you can start the container with the following command: docker run -it apache/spark /bin/sh The Spark installation is in the /opt/spark directory. PySpark, spark-sql, and all other tools are in the /opt/spark/bin directory. We have included several instructions on how to work with the container in the readme of the book’s GitHub repository. Using Delta Lake with PySpark As mentioned before, Delta Lake runs on top of your existing storage and is fully compatible with the existing Apache Spark APIs. This means it is easy to start with Delta Lake if you already have Spark installed or a container as specified in the previous section. With Spark in place, you can install the delta-spark 2.4.0 package. You can find the delta-spark package in its PySpark directory. Enter the following command in a command shell: pip install delta-spark Once you have delta-spark installed, you can run the Python shell interactively like this: pyspark --packages io.delta: --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog= org.apache.spark.sql.delta.catalog.DeltaCatalog" This will give you a PySpark shell from which you can interactively run commands: 26 | Chapter 2: Getting Started with Delta Lake Welcome to I _/ _\ V /_ I / /_ V '/ _/ ’_/ /_/\_\ version 3.2.2 /_/ Using Python version 3.9.13 (tags/v3.9.13:6de2ca5, May 17 2022 16:36:42) Spark context Web UI available at http://host.docker.internal:4040 Spark context available as ’sc’ (master = local[*], app id = local-1665944381326). SparkSession available as ’spark’. Inside the shell, you can now run interactive PySpark commands. We always do a quick test by creating a range() with Spark, resulting in a DataFrame that we can then save in Delta Lake format (more details on this in “Creating and Running a Spark Program: helloDeltaLake” on page 29). The full code is provided here: data = spark.range(0, 10) data.write.format("delta").mode("overwrite").save("/book/testShell") The following is a full run: »> data = spark.range(0, 10) »> data.write.format("delta").mode("overwrite").save("/book/testshell") »> Here we see the statement to create the range(), followed by the write statement. We see that the Spark Executors do run. When you open up the output directory, you will find the generated Delta table (more details on the Delta table format in the next section). Running Delta Lake in the Spark Scala Shell You can also run Delta Lake in the interactive Spark Scala shell. As specified in the Delta Lake Quickstart, you can start the Scala shell with the Delta Lake packages as follows: spark-shell --packages io.delta: --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog= org.apache.spark.sql.delta.catalog.DeltaCatalog" This will start up the interactive Scala shell: Spark context Web UI available at http://host.docker.internal:4040 Spark context available as 'sc1 (master = local[*], app id = local-1665950762666). Spark session available as 'spark'. Welcome to Running Delta Lake in the Spark Scala Shell | 27 I _/ / /_ _\ V _ V _ 7 _/ ’_/ /_/ /_/\_\ /_/ version 3.2.2 Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_311) Type in expressions to have them evaluated. Type :help for more information. scala> Inside the shell, you can now run interactive Scala commands. Let’s do a similar test on Scala as you did for the PySpark shell: val data = spark.range(0, 10) data.write.format("delta").mode("overwrite").save("/book/testShell" ) Here is a full run: cala> val data = spark.range(0, 10) data: org.apache.spark.sql.Dataset[Long] = [id: bigint] scala> data.write.format("delta").mode("overwrite").save("/book/testShell" ) Again, when you check your output, you can find the generated Delta table. Running Delta Lake on Databricks For the examples later on in this book, the Databricks Community Edition was chosen to run Delta Lake. This was chosen to develop and run the code samples because it is free, simplifies setup of Spark and Delta Lake, and does not require your own cloud account or for you to supply cloud compute or storage resources. With the Databricks Community Edition, users can access a cluster with a complete notebook environment and an up-to-date runtime with Delta Lake installed on this platform. If you do not want to run Spark and Delta Lake on your local machine, you can also run Delta Lake on Databricks on a cloud platform, like Azure, AWS, or Google Cloud. These environments make it easy to get started with Delta Lake, since their installed runtimes already have a version of Delta Lake installed. The additional benefit of the cloud is that you can create real Spark clusters of arbitrary size, potentially up to thousands of cores spanning hundreds of nodes to process terabytes or petabytes of data. When using Databricks in the cloud, you have two options. You can use its popular notebooks or you can connect your favorite development environment to a cloud based Databricks cluster with dbx. dbx by Databricks labs is an open source tool that allows you to connect to a Databricks cluster from an editing environment. 28 | Chapter 2: Getting Started with Delta Lake Creating and Running a Spark Program: helloDeltaLake Once you have the delta-spark package installed, creating your first PySpark program is very straightforward. Follow these steps to create the PySpark program. Create a new file (we named ours helloDeltaLake.py). Add the necessary imports. At a minimum you need to import PySpark and Delta Lake: import pyspark from delta import * Next, create a SparkSession builder, which loads up the Delta Lake extensions, as follows: # Create a builder with the Delta extensions builder = pyspark.sql.SparkSession.builder.appName("MyApp").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ \ \ \ Next, we can create the SparkSession object itself. We will create the SparkSession object and print out its version to ensure that the object is valid: # Create a Spark instance with the builder # As a result, you now can read and write Delta tables spark = configure_spark_with_delta_pip(builder).getOrCreate() print(f"Hello, Spark version: {spark.version}") To verify that our Delta Lake extensions are working correctly, we create a range and write it out in Delta Lake format: # Create a range, and save it in Delta Lake format to ensure # that your Delta Lake extensions are indeed working df = spark.range(0, 10) df.write \.formatC'delta") \ \.mode("overwrite").save("/book/chapter02/helloDeltaLake") That completes the code for your starter program. You can find the full code file in the /chapter02/helloDeltaLake.py location of the books code repository. This code is a good place to start if you want to write your own code. To run the program, we can simply start a command prompt on Windows, or a terminal on MacOS, and navigate to the folder with our code. We simply start PySpark with the program as input: pyspark < helloDeltaLake.py When we run the program, we get our Spark version output (the displayed version will depend on the version of Spark the reader has installed): Creating and Running a Spark Program: helloDeltaLake | 29 Hello, Spark version: 3.4.1 And when we look at the output, we can see that we have written a valid Delta table. The details of the Delta Lake format are covered in the next section. At this point, we have PySpark and Delta Lake installed successfully, and we were able to code and run a full-fledged PySpark program with Delta Lake extensions. Now that you can run your own programs, we are ready to explore the Delta Lake format in detail in the next section. The Delta Lake Format In this section we will dive deeper into the Delta Lake open-table format. When we save a file using this format, we are just writing a standard Parquet file with additional metadata. This additional metadata is the foundation for enabling the core features of Delta Lake, and even just performing DML operations typically seen in traditional RDBMSs such as INSERT, UPDATE, and DELETE, among a vast array of other operations. Since Delta Lake writes out data as a Parquet file, we will take a more in-depth look at the Parquet file format. We first write out a simple Parquet file and take a detailed look at the artifacts written by Spark. This will give us a good understanding of files we will leverage throughout this book. Next, we will write out a file in Delta Lake format, noticing how it triggers the creation of the _delta_log directory, containing the transaction log. We will take a detailed look at this transaction log and how it is used to generate a single source of truth. We will see how the transaction log implements the ACID atomicity property mentioned in Chapter 1. We will see how Delta Lake breaks down a transaction into individual, atomic com mit actions, and how it records these actions in the transaction log as ordered, atomic units. Finally, we will look at several use cases and investigate what Parquet data files and transaction log entries are written, and what is stored in these entries. Since a transaction log entry is written for every transaction, we might end up with multiple small files. To ensure that this approach remains scalable, Delta Lake will create a checkpoint file every 10 transactions (at the time of writing) with the full transactional state. This way, a Delta Lake reader can simply process the checkpoint file and the few transaction entries written afterward. This results in a fast, scalable metadata system. 30 | Chapter 2: Getting Started with Delta Lake Parquet Files The Apache Parquet file format has been one of the most popular big data formats for the last 20 years. Parquet is open source, so it is free to use under the Apache Hadoop license and is compatible with most Hadoop data processing frameworks. Unlike row-based formats such as CSV or Avro, Parquet is a column-oriented format, meaning that the values of each column/field are stored next to each other, rather than in each record. Figure 2-1 shows the differences between a row-based layout and a column-oriented layout and how that is represented in a logical table. Logical table representation Column string Column int Column date RowO Rowl Row 2 Row layout Column layout Figure 2-1. Difference between row-based and column-oriented layouts Figure 2-1 demonstrates that instead of sequentially storing row values, as in the row layout, the column layout sequentially stores column values. This columnar format helps with compression on a column-by-column basis. This format is also built to support flexible compression options and extendable encoding schemas for each data type, meaning a different encoding can be used for compressing integer versus string data types. Parquet files are also comprised of row groups and metadata. Row groups contain data from the same column, and thus each column is stored together in a row group. The metadata in a Parquet file not only contains information about these row groups, but also information about columns (e.g., min/max values, number of values) and the data schema, which makes Parquet a self-describing file with additional metadata to enable better data skipping. Figure 2-2 shows how Parquet files are comprised of row groups and metadata. Each row group consists of a column chunk for each column in the dataset, and each column chunk consists of one or more pages with the column data. To explore more The Delta Lake Format | 31 documentation and dive deeper into the Parquet file formats, you can look at the Apache Parquet website and documentation. Figure 2-2. Parquet file composition Advantages of Parquet files Due to the column-oriented format, storage layout, metadata, and long-standing popularity, Parquet files have several strong advantages for analytical workloads and when working with big data: High performance Because Parquet files are a column-oriented format, they enable better compres sion and encoding since these algorithms can take advantage of the similar values and data types stored in each column. For I/O-intensive operations, this compressed data can improve performance significantly. When column values are stored together in the case of Parquet files, queries only need to read the columns required for that query, as opposed to requiring all columns be read in the case of row-based formats. This means that the columnar format can reduce the amount of data that needs to be read for operations, resulting in better performance. The metadata contained in Parquet files describes some of the features of the data. It contains information about row groups, data schemas, and, most impor tantly, columns. Column metadata includes information such as min/max values and the number of values. Together, this metadata reduces the amount of data that needs to be read for each operation (i.e., data skipping) and enables far better query performance. 32 | Chapter 2: Getting Started with Delta Lake Cost-effective Since Parquet files are able to leverage better compression and encoding, this inherently makes the data more cost-effective to store. By nature, compressed data consumes less space on disk when storing files, which inevitably results in reduced storage space and reduced storage costs. Interoperability Since Parquet files have been very popular for the past 20 years, especially for leg acy big data processing frameworks and tools (e.g., Hadoop), they are very widely supported across different tools and engines, and offer great interoperability. Writing a Parquet file In the book repository, the /chapterO2/writeParquetFile Python program creates a Spark DataFrame in memory, and writes it in Parquet format to the /parquetData folder using the standard PySpark API: data = spark.range(0, 100) data.write.format("parquet") \.mode("overwrite") \.save(1/book/chapter02/parquetData') In our case, when we look at what is written to disk, we see the following (you may see a different result, depending on your local machine): Directory of C:\book\chapter02\parquetData 10/17/2022 10/17/2022 511 part-000O0-a3885270-..-C000.snappy.parquet part-000Ol-a388527O-..-c000.snappy.parquet part-00002-a3885270-. part-000G3-a3885270-. part-00004-a3885270-. part-000O5-a3885270-. part-000O6-a3885270-..-c000.snappy.parquet 10/17/2022 513 517 513 513 517 513 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022 513 517 513 513 517 part-000O7-a388527O-. part-000G8-a3885270-. part-00009-a3885270-. part-00010-a3885270-. part-00011-a3885270-. 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022.-c000.snappy.parquet.-C000.snappy.parquet.-c000.snappy.parquet.-c000.snappy.parquet.-c000.snappy.parquet.-c000.snappy.parquet.-c000.snappy.parquet.-C000.snappy.parquet.-c000.snappy.parquet A developer new to the big data world might be a bit shocked at this point. We only did a single write of 100 numbers, so how did we end up with 12 Parquet files? A bit of elaboration might be in order. First, the filename we specified in the write is really the name of a directory, not a file. As you can see, the directory /parquetData contains 12 Parquet files. When we look at the.parquet files, we may see that we have 12 files. Spark is a highly parallel computational environment, where the system is attempting to keep each CPU core in your Spark cluster busy. In our case, we are running on a local machine, The Delta Lake Format | 33 which means there is one machine in our cluster. When we look at the information for our system, we see that we have 12 cores. When we look at the number of.parquet files that were written, we see that we have 12 files, which is equal to the number of cores on our cluster. And that is Sparks default behavior in this scenario. The number of files will be equal to the number of available cores. Assume we add the following statement to our code: data = spark.range(0, 100) \ \.save(1/book/chapter02/parquetData') data.write.format("parquet").mode("overwrite") print(f"The number of partitions is: (data.rdd.getNurnPartitions()}") We can see in the output that we indeed have 12 files: The number of partitions is: 12 While this might look like overkill for a scenario where you are only writing 100 numbers, one can imagine a scenario where you are reading or writing very large files. Having the ability to split the files and process them in parallel can dramatically increase performance. The.arc files you see in the output are cyclic redundancy check files. Spark uses them to ensure that data hasn’t been corrupted. These files are always very small, so their overhead is very minimal compared to the utility that they provide. While there is a way to turn off the generation of these files, we would not recommend doing so since their benefit far outweighs their overhead. The final files in your output are the -SUCCESS and -SUCCESS.crc files. Spark uses these files to provide a method to confirm that all partitions have been written correctly. Writing a Delta Table So far, we have been working with Parquet files. Now, let’s take the first example from the previous section and save it in Delta Lake format instead of Parquet (code: /chapterO2/writeDeltaFile.py). All we need to do is replace the Parquet format with Delta format, as shown in the code: data = spark.range(0, 100) data.write.format("delta") \ \ ,mode("overwrite") \.save('/book/chapter02/deitaData1) print(f"The number of filesis: {data.rdd.getNumPartitions()}") We get the same number of partitions: The number of files is: 12 34 | Chapter 2: Getting Started with Delta Lake And when we look at the output, we see the addition of the _delta_log file: Directory of C:\book\chapter02\deltaData 10/17/2022 16.part 00000-..-C0OO.snappy.parquet.crc 10/17/2022 16.part 00001-..-C0OO.snappy.parquet.crc 10/17/2022 16.part 00002-..-c00O.snappy.parquet.crc 10/17/2022 16.part 00003-..-c00O.snappy.parquet.crc 10/17/2022 16.part 00004-..-c0OO.snappy.parquet.crc 10/17/2022 16.part 00005-..-C0OO.snappy.parquet.crc 10/17/2022 16.part 00006-..-C00O.snappy.parquet.crc 10/17/2022 16.part 00007-..-C0OO.snappy.parquet.crc 10/17/2022 16.part 00008-..-c0OO.snappy.parquet.crc 10/17/2022 16.part 00009-..-c00O.snappy.parquet.crc 10/17/2022 16.part 00010-..-c0OO.snappy.parquet.crc 10/17/2022 16.part 00011-..-C0O0.snappy.parquet.crc 10/17/2022 524 part 00000-..-C0OO.snappy.parquet 10/17/2022 519 part 00001-..-C0OO.snappy.parquet 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022 10/17/2022 523 part 00002-. 519 part 00003-. 519 part 00004-..-c0OO.snappy.parquet 00005-. 00006-. 00007-. 00008-. 00009-..-C0OO.snappy.parquet 00010-. 523 part 00011-..-C000.snappy.parquet 522 519 519 523 519 519 part part part part part part 24 File(s).-c0OO.snappy.parquet.-C0O0.snappy.parquet.-C0OO.snappy.parquet.-c0OO.snappy.parquet.-c00O.snappy.parquet.-c0OO.snappy.parquet.-C000.snappy.parquet delta_log 6,440 bytes The _delta_log file contains a transaction log with every single operation performed on your data. s Delta Lake 3.0 includes UniForm (short for “Universal Format”). With UniForm enabled, Delta tables can be read as if they were other open-table formats, such as Iceberg. This enables you to use a broader range of tools without worrying about table format compatibility %sql CREATE TABLE T TBLPROPERTIES( 'delta.columnMapping.mode' = 'name1, 'delta.universalFormat.enabledFormats' = 'iceberg') AS SELECT * FROM source_table; UniForm automatically generates Apache Iceberg metadata along side Delta metadata, atop one copy of the underlying Parquet data. The metadata for Iceberg is automatically generated on table cre ation and is updated whenever the table is updated. The Delta Lake Format | 35 The Delta Lake Transaction Log The Delta Lake transaction log (also known as DeltaLog) is a sequential record of every transaction performed on a Delta Lake table since its creation. It is central to Delta Lake functionality because it is at the core of its important features, including ACID transactions, scalable metadata handling, and time travel. The main goal of the transaction log is to enable multiple readers and writers to operate on a given version of a dataset file simultaneously and to provide additional information, like data skipping indexes to the execution engine for more performant operations. The Delta Lake transaction log always shows the user a consistent view of the data and serves as a single source of truth. It is the central repository that tracks all changes the user makes to a Delta table. When a Delta table reader reads a Delta table for the first time or runs a new query on an open file that has been modified since the last time it was read, Delta Lake looks at the transaction log to get the latest version of the table. This ensures that a user’s version of a file is always synchronized with the master record as of the most recent query and that users cannot make divergent, conflicting changes to a file. How the Transaction Log Implements Atomicity In Chapter 1, we learned that atomicity guarantees that all operations (e.g., INSERT, UPDATE, DELETE, or MERGE) performed on your file will either succeed as a whole or not succeed at all. Without atomicity, any hardware failure or software bug can cause a data file to be written partially, resulting in corrupted or, at a minimum, invalid data. The transaction log is the mechanism through which Delta Lake can offer the atomic ity guarantee. The transaction log is also responsible for metadata, time travel, and significantly faster metadata operations for large tabular datasets. The transaction log is an ordered record of every transaction made against a Delta table since it was created. It acts as a single source of truth and tracks all changes made to the table. The transaction log allows users to reason about their data and trust its completeness and quality. The simple rule is if an operation is not recorded in the transaction log, it never happened. In the following sections, we will illustrate these principles with several examples. Breaking Down Transactions into Atomic Commits Whenever you perform a set of operations to modify a table or storage file (such as INSERTS, UPDATES, DELETES, or MERGEs), Delta Lake will break down that operation into a series of atomic, discrete steps composed of one or more of the actions shown in Table 2-1. 36 | Chapter 2: Getting Started with Delta Lake Table 2-1. List ofpossible actions in a transaction log entry 1 Action Add file Description Adds a file. Remove file Removes a file. Update metadata Updates the table's metadata (e.g., changing the table or file's name, schema, or partitioning). A table or file's first transaction log entry will always contain an update metadata action with the schema, the partition columns, and other information. Set transaction Records that a structured streaming job has committed a micro-batch with the given stream ID. For more information, see Chapter 8. Change protocol Enables new features by switching the Delta Lake transaction log to the newest software protocol. Commit info Contains information about the commit, which operation was made, from where, and at what time. Every transaction log entry will contain a commit info action. These actions are recorded in the transaction log entries (*.json) as ordered, atomic units known as commits. This is similar to how the Git source control system tracks changes as atomic commits. This also implies that you can replay each of the commits in the transaction log to get to the current state of the file. For example, if a user creates a transaction to add a new column to a table and then adds data to it, Delta Lake would break this transaction down into its component action parts, and once the transaction completes, add them to the transaction log as the following commits: 1. Update metadata-, change the schema to include the new column 2. Add file: for each new file added The Transaction Log at the File Level When you write a Delta table, that file’s transaction log is automatically created in the _delta_log subdirectory. As you continue to make changes to the Delta table, these changes will be automatically recorded as ordered atomic commits in the transaction log. Each commit is written as a JSON file, starting with 0000000000000000000.json. If you make additional changes to the file, Delta Lake will generate additional JSON files in ascending numerical order, so the next commit is written as 0000000000000000001.json, the following one as 0000000000000000002 json, and so on. In the remainder of this chapter, we will use an abbreviated form for the transaction log entries for readability purposes. Instead of showing up to 19 digits, we will use an abbreviated form with up to 5 digits (so you will use 00001.json instead of the longer notation). The Delta Lake Transaction Log | 37 Additionally, we will be shortening the name of the Parquet files. These names typically look as follows: part-00007-71c70d7f-c7a8-4a8c-8c29-57300cfd929b-c000.snappy parquet For demonstration and explanation, we will abbreviate a name like this to part-00007.parquet, leaving off the GUID and the snappy.parquet portion. In our example visualizations, we will visualize each transaction entry with the action name and the data filename affected; for example, in Figure 2-3, we have a remove (file) action and another add (file) action in a single transaction file. Action ------------------- 1 Part name Remove part-OOOOl Add part-00004 r OOOO2.json Figure 2-3. Notation for a transaction log entry Write multiple writes to the same file Throughout this section, we will use a set of figures that describe each code step in detail. We show the following information for each step: The actual code snippet is shown in the second column. Next to the code snippet we show the Parquet data files written as a result of the code snippet execution. In the last column we show the transaction log entry’s JSON files. We show the action and the affected Parquet data filename for each transaction log entry. For this first example you will use chapter02/MultipleWriteOperations.py from the book’s repository to show multiple writes to the same file. Here is a step-by-step description of the different steps in Figure 2-4: 1. First, a new Delta table is written to the path. One Parquet file was written to the output path (part-00000.parquet). The first transaction log entry (OOOOO.json) has been created in the _delta_log directory. Since this is the first transaction log entry for the file, a metadata action and an add file action are recorded, indicating a single partition file was added. 2. Next we append data to the table. We can see a new Parquet file (part-00001.par quet) has been written, and we created an additional entry (OOOOl.json) in the 38 | Chapter 2: Getting Started with Delta Lake transaction log. Like the first step, the entry contains an add file action, because we added a new file. 3. We append more data. Again, a new data file is written (part-00002.parquet), and a new transaction log file (00002.json) is added to the transaction log with an add file action. Step Code Parquet files written JSON files in_delta.log 1 df.coalesce(l).write. format (’’delta”).save(DATALAKE_PATH) 1 part-OOOOO.parquet Action Part name Metadata N/A Add part-00000 L______________________________________ J OOOOO.json df.coalesce(l).write. formate’’delta”).mode ("append”).save(DATALAKE_PATH) 2 df.coalesce(l).write.format("delta”).mode ("append”).save(DATALAKE_PATH) 3 F------------------------------------------------------------------------- 1 part-OOOOlparquet ------------------------------------------1 [ Action Part name Add part-OOOOl j OOOOl.json part-00002.parquet Action Part name Add part-00002 00002.json - Figure 2-4. Multiple writes to the same file Note that each transaction log entry will also have a commit info action, which contains the audit information for the transaction. We omitted the commit info log entries on the figures for readability purposes. The sequence of operations for writes is very important. For each write operation, the data file is always written first, and only when that operation succeeds, a transaction log file is added to the _delta_log folder. The transaction is only considered complete when the transaction log entry is written successfully. Reading the latest version of a Delta table When the system reads a Delta table, it will iterate through the transaction log to “compile” the current state of the table. The sequence of events when reading a file is as follows: The Delta Lake Transaction Log | 39 1. The transaction log files are read first. 2. The data files are read based on the log files. Next, we will describe that sequence for the Delta table written by the previous example (multipleWriteOperations.py). Delta will read all the log files (OOOOO.json, 00001.json, and 00002.json). It will read the three data files based upon the log information, as shown in Figure 2-5. JSON files in _delta_log Parquet files written Part-OOOOO.parquet Part-OOOO1.parquet Part-OOOO2.parquet Stepl Step 2 - Figure 2-5. Read operations Note that the sequence of operations also implies that there could be data files that are no longer referenced in the transaction log. Indeed, this is a common occurrence in UPDATE or DELETE scenarios. Delta Lake does not delete these data files since they might be required again if the user uses the time travel feature of Delta Lake (covered in Chapter 6). You can remove old, obsolete data files with the VACUUM command (also covered in Chapter 6). Failure scenario with a write operation Next, let’s see what happens if a write operation fails. In the previous write scenario, let’s assume the write operation in step 3 of Figure 2-4 fails halfway through. Part of the Parquet file might have been written, but the transaction log entry 00002.json was not. We would have the scenario shown in Figure 2-6. 40 | Chapter 2: Getting Started with Delta Lake Step Code df.coalesce(l).write.format("delta").save(DATALAKE_PATH) JSON files in.deltaJog --------------- 1 Action Part name Parquet files written part-OOOOO.parquet L______________________________________ 4 Metadata N/A Add part-00000 OOOOO.json df.coalesce(l).write.format("delta").mode ("append").save(DATALAKE_PATH) 2 part-OOOO1.parquet [ Action Part name Add | part-OOOO7 ] OOOOl.json.________________________________ j df.coalesce(l).write.format("delta").mode ("append").save(DATALAKE_PATH) 3 part-OOOO2.parquet - - Figure 2-6. Failure during the last write operation. As you can see in Figure 2-6, the last transaction file is missing. According to the read sequence specified earlier, Delta Lake will read the first and second JSON transaction files, and the corresponding part-00000 and part-00001 Parquet files. The Delta Lake reader will not read inconsistent data; it will read a consistent view through the first two transaction log files. Update scenario The last scenario is contained in the chapter02/UpdateOperation.py code repo. To keep things simple, we have a small Delta table with patient information. We are only tracking the patientld and the PatientName of each patient. In this use case, we create a Delta table with four patients, two in each file. Next, we add data from two more patients. Finally, we update the name of the first patient. As you will see, this update has a bigger impact than expected. The full update scenario is shown in Figure 2-7. The Delta Lake Transaction Log | 41 Step Read OO.json Include part-0 Include part-1 Code df.coalesce(2).write.format("delta").save(DATALAKE_PATH) Read Ol.json Include part-2 Read O2.json Remove part-0 Include part-3 Final result: part-1 part-2 part-3 are included in latest data JSON files in _delta_log Parquet files written patientID Name |pati liename 1 Pl 3 P3 2 P2 4 P4 part-O.parquet part-1.parquet -------------------------- 1 df.coalesce(l).write. format("delta").mode ("append").save(DATALAKE_PATH) deltaTable.update( condition = col("patientld") == 1, set = {'name': lit("pll")} “--------------------------- --------------1 Operation Filename [ patientID Name 1 Pll 2 P2 part-3. parquet partO OOOOO.json Name part-2.parquet deltaTable = DeltaTable \.forPath(spark, DATALAKE_PATH) Add Add port2 ] OOOOl.json Action Part name Remove portO Add ports OOOO2.json Figure 2-7. Updates and the transaction log In this example, we execute the following steps: 1. The first code snippet creates a Spark DataFrame, with the patientld and Name of four patients. We write the DataFrame to a Delta table, forcing the data into two files with.coalesce(2). As a result, we write two files. A transaction log entry is created (OOOOO.json) once the part-OOOOO.parquet and part-00001.parquet files are written. Note the transaction log entry contains two add file actions indicating the part-OOOOO.parquet and the part-00001.parquet files were added. 2. The next code snippet appends the data for two more patients (P5 and P6). This results in the creation of the part-00002.parquet file. Again, once the file is written, the transaction log entry is written (OOOOl.json), and the transaction is complete. Again, the transaction log file has one add file action, indicating that a file (part-00002.parquet) was added. 42 | Chapter 2: Getting Started with Delta Lake 3. The code performs an update. In this case, we want to update the patients name with patientld 1 from Pl to Pll. Currently, the record for patientld 1 is present in part-0. To perform an update, part-0 is read and a map operator is used to update any record matching the patientld of 1 from Pl to Pll. A new file is written as part-3. Finally, Delta Lake writes the transaction log entry (00002.json). Notice it writes a remove file action, saying that the part-0 file is removed, and an add action, saying that the part-3 file has been added. This is because the data from part-0 was rewritten into part-3, and all modified rows (along with the unmodified rows) have been added to part-3, rendering the part-0 file obsolete. Notice that Delta Lake does not delete the part-0 file, since a user might want to go back in time with time travel, in which case the file is required. The VACUUM command can clean up unused files like this (Chapter 6 covers time travel and cleaning up unused files in detail). Now that we have seen how the data is written during an update, let’s look at how a read would determine what to read, as illustrated in Figure 2-8. Final data read: patientID Name 3 P3 4 P4 5 P5 6 P6 1 Pll 2 P2 ----------------------- Figure 2-8. Reading after an update The read would proceed as follows: 1. The first transaction log entry is read (OOOOO.json). This entry tells Delta Lake to include the part-0 and part-1 files. 2. The next entry (OOOOl.jsori) is read telling Delta Lake to include the part-2 file. 3. The last entry (00002.json) is read, which informs the reader to remove the part-0 file and include part-3. As a result, the reader ends up reading part-1, part-2, and part-3, resulting in the correct data shown in Figure 2-8. The Delta Lake Transaction Log | 43 Scaling Massive Metadata Now that we have seen how the transaction log records each operation, we can have many very large files with thousands of transaction log entries for a single Parquet file. How does Delta Lake scale its metadata handling without needing to read thousands of small files, which would negatively impact Spark’s reading performance? Spark tends to be most effective when reading large files, so how do we resolve this? Once the Delta Lake writer has made the commits to the transaction log, it will save a checkpoint file in Parquet format in the _delta_log folder. The Delta Lake writer will continue to generate a new checkpoint every 10 commits. A checkpoint file saves the entire state of the table at a given point in time. Note that “state” refers to the different actions, not the file’s actual content. So, it will contain the add file, remove file, update metadata, commit info, etc., actions, with all the context information. It will save this list in native Parquet format. This will allow Spark to read the checkpoint quickly. This gives the Spark reader a “shortcut” to fully reproduce a table’s state and avoid reprocessing thousands of small JSON files, which could be inefficient. Checkpoint file example Following is an example (illustrated in Figure 2-9) where we execute multiple com mits, and a checkpoint file is generated as a result. This example uses the code file chap02/ TransactionLogCheckPointExample.py from the book’s repository. 44 | Chapter 2: Getting Started with Delta Lake Step Code Parquet files written JSON files in delta log port-OOOOO.parqi/etl I 1 df.coalesce(l).write.format("delta").save(DATALAKE_PATH) # Loop from 0..9 for index in range(9): # create a patient tupie patientID = 10 + index t = (patientID, f"Patient {patientID}", "Phoenix") 2 # Create and write the dataframe df = spark.createDataFrame( [t], columns) df.write.format("delta").mode("append").save(DATALAKE_PATH) [ Action Add Part name 1 |port-0000o] OOOOO.json 1 part-OOOOl.parquet ] Action Part name Add part-OOOO1 ] OOOOl.json part-OOOO2.parquet OOOO2.json Action Part name Add port-00009] OOOO9.json patient ID = 100 t = (patient ID, f" Patient {patient ID}", "Phoenix") 3 * part-OOOIO.parquet df = spark.createDataFrame( [t], columns) df.write.format("delta").mode("append").save(DATALAKE_PATH) Action Part name Add part-OOO1O OOOIO.json * « OOOlO.checkpoint. parquet for index in range(2): patientID = 200 + index t = {patientID, f"Patient{patientID}", "Phoenix") 4 df = spark.createDataFrame( [t], columns) df.write.format("delta").mode("append").save(DATALAKE_PATH) part-OOO12.parquet ] part-OOO13.parquet Action Part name Add part-OOO12 ] OOOOl.json “----------------Action Par [ Add part-OOO13 00002.json Figure 2-9. Checkpoint file example The Delta Lake Transaction Log | 45 This example has the following steps: 1. The first code snippet creates a standard Spark DataFrame with several patients. Note that we apply a coalesce(l) transaction to the DataFrame, forcing that data into one partition. Next, we write the DataFrame in Delta Lake format to a storage file. We verify that a single part-0001.parquet file was written. We also see that a single trans action log entry (OOOOO.json) has been created in the _delta_log directory. This directory entry contains an add file action for the part-00001.parquet file. 2. In the next step, we set up a loop over a range(0, 9), which will loop nine times, creating a new patient row, then creating a DataFrame from that tuple, and writing the DataFrame to your storage file. Since you loop nine times, we create nine additional Parquet files, from part-00001.parquet through part-00009.parquet. We also see nine additional transaction log entries, from OOOOl.json through 00009.json. 3. In step 3, we create one more patient tuple, convert it to a DataFrame, and write it to the Delta table. This creates one additional data file (part-00010.parquet). The transaction log has a standard log entry (OOOlO.json) with the add file action for the part-OOOlO.parquet file. But the interesting fact is that it also creates a OOOOlO.checkpoint.parquet file. This is the checkpoint mentioned earlier. A checkpoint is generated every 10 commits. This Parquet file contains the entire state of your table at the time of the commit in native Parquet format. 4. In the last step, the code generates two more commits, creating the part-00011.parquet and part-00012.parquet, and two new log entries with add file entries pointing to these files. If Delta Lake needs to re-create the state of the table, it will simply read the check point file (OOOOlO.checkpoint.parquet), and reapply the two additional log entries (OOOll.json and 00012.json). Displaying the checkpoint file Now that we have generated the checkpoint.parquet file, let’s take a look at its content using the /chapter02/readCheckPointFile.py Python file: 46 | Chapter 2: Getting Started with Delta Lake # Set your output path for your Delta table DATALAKE_PATH = "/book/chapter02/transacttonLogCheckPotntExample" CHECKPOINT_PATH = "/_delta_log/00OG00O0O0000OOO0010.checkpoint.parquet" # Read the checkpoint.parquet file checkpoint_df = \ spark \.read \.fomat( "parquet") \.load(f"{DATALAKE_PATH}{CHECKPOINT_PATHJ") # Display the checkpoint dataframe checkpoint_df.show() Notice how we do a Parquet format read here, because the checkpoint file is indeed stored in Parquet format, not Delta format. The content of the checkpoint_df DataFrame is shown here: + - — +.......................................+-.........+......................................+ -.............+ | txn| add|r emove| metaData|p rotocol| |null|{part-O0O00-f7d9f...| |null|{part-00000-a65e0...| |null|{part-OOOO0-4c3ea...| |null|{part-O0000-8eblf...| |null|{part-00000-2el43...| |null|{part-00O00-dldl3...| |null|{part-00O00-650bf...| |null|{part-000O0-ea06e...| |null|{part-00000-79258...| |null|{part-00000-23558... | |null| null| |null| null| |null|{part-00O00-eb29a...| + - — +.......................................+ - null| null| null| null| null| null| null| null| null| null| null| null] null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null] null| null| {1, 2)1 null|{376ce2d6-llbl-46...| null| null| null| null|.........+......................................+-.............+ As you can see, the checkpoint file contains columns for the different actions (add, remove, metadata, and protocol). We see add file actions for the different Parquet data files, an update metadata action from when we created the Delta table, and a change protocol action resulting from the initial Delta table write. Note that DataFrame.show() will not show the DataFrame records in order. The change protocol and update metadata records are always the first records in the checkpoint file, followed by the different add file actions. The Delta Lake Transaction Log | 47 Conclusion As we begin the journey into Delta Lake, it all starts with the initial setup. This chapter walked through how to set up Delta Lake with PySpark and the Spark Scala shell on your local machine, while covering necessary libraries and packages to enable you to run a PySpark program with Delta Lake extensions. You can also simplify this setup process using a cloud-based tool like Databricks to develop, run, and share Spark-based applications like Delta Lake. After reading about getting Delta Lake up and running, we began to learn about the foundational components of Delta Lake that inevitably enable most of the core features we will discuss throughout this book. By adding checkpoint files to enable scalable metadata and a transaction log to standard Parquet files to support ACID transactions, Delta Lake has the key elements to support reliability and scalability. And now that we have established these foundational components, you will learn more about basic operations on a Delta table in the next chapter. 48 | Chapter 2: Getting Started with Delta Lake