(Delta) Ch 5 Performance Tuning.pdf
Document Details
Uploaded by EnrapturedElf
Tags
Full Transcript
CHAPTER 5 Performance Tuning Any time you are storing and retrieving data, whether with a traditional RDBMS or with Delta tables, how you organize the data in the underlying storage format can significantly affect the time it takes to perform table operations and queries. In general, performance tun...
CHAPTER 5 Performance Tuning Any time you are storing and retrieving data, whether with a traditional RDBMS or with Delta tables, how you organize the data in the underlying storage format can significantly affect the time it takes to perform table operations and queries. In general, performance tuning refers to the process of optimizing the performance of a system, and in the context of Delta tables this involves optimizing how the data is stored and retrieved. Historically, retrieving data is accomplished by either increasing RAM or CPU for faster processing, or reducing the amount of data that needs to be read by skipping nonrelevant data. Delta Lake provides a number of different techniques that can be combined to accelerate data retrieval by efficiently reducing the amount of files and data that needs to be read during operations. An additional problem that can contribute to slower reads and inefficient processing in Apache Spark and Delta Lake is the small file problem, briefly mentioned in Chapter 1. The small file problem is an issue that can arise when the underlying data files are divided into numerous small files, as opposed to larger, more efficient files. It can occur for several different reasons, primarily due to frequent writes, but can be addressed through a variety of techniques in Delta Lake that include compacting small files into larger files. By leveraging good performance tuning strategies to reduce the effects of the small file problem and better enable data skipping on Delta tables, you can significantly improve the performance of execution times, especially when dealing with large tables or resource-intensive data lake operations and queries. Data Skipping Skipping nonrelevant data is ultimately the foundation for most performance tuning features, as it aims to reduce the amount of data that needs to be read. This feature, 99 called data skipping, can be enhanced through a variety of different techniques in Delta Lake. Delta Lake automatically maintains the minimum and maximum value for up to 32 fields for files, and stores those values as part of the metadata. Delta Lake uses these minimum and maximum ranges to skip the files that are out of the range of the querying field values. This is a key aspect that enables data skipping through what is called data skipping statistics. You do not need to configure or specify data skipping and data statistics as this feature is activated whenever applicable in Delta Lake, but the effectiveness greatly depends on the layout of your data. In order to maximize the effectiveness of data skipping, data can be consolidated, clustered, and colocated using commands such as OPTIMIZE and ZORDER BY, which will be discussed in further detail in sub sequent sections, so that minimum and maximum ranges are narrow and, ideally, nonoverlapping. Delta Lake collects the following data skipping statistics for each data file: Number of records Minimum values for each of the first 32 columns Maximum values for each of the first 32 columns Number of null values for each of the first 32 columns Delta Lake collects these statistics on the first 32 columns defined in your table schema. Please note, each field within nested columns (e.g., StructType1) counts as a column. You can configure statistics collection on certain columns by reordering col umns in the schema, or you can increase the number of columns to collect statistics on by using delta.dataSkippingNumlndexedCols, but adding additional columns also adds additional overhead that can adversely affect write performance. Typically, you want to collect data skipping statistics on columns that are commonly used in filters, WHERE clauses, joins, and columns that you tend to perform aggregations on. Conversely, avoid collecting data skipping statistics on long strings as they are far less efficient for data skipping purposes. Figure 5-1 shows that by default only statistics for the first 32 columns are collected on a table. And for the purpose of collecting statistics, each field within a nested column is considered an individual column. 1 Apache Spark data types 100 | Chapter 5: Performance Tuning... /tmp/compiit. json python -n json.tool < /tnp/commit.json This produces the following output (only relevant portions shown): stats: "{\"numRecords\":12177114,\"minValues\":{\"VendorID\":1, \"tpep_pickup_datetine\":\"2022-01-01\"...."maxValues\":{\"VendorID\":6, \"tpep_pickup_datetime\":\"2022-ll-01\"....”nullCount\":{\"VendorID\":0, \"tpep_pickup_datetime\" :0 Data Skipping | 101 K We are displaying the add file command in the transaction log using a shell command to show the appropriate item in a JSON file in a readable format and in a programmatic way. Since this file is written to a storage location where your Delta table is stored, you can also navigate to that location to open the appropriate JSON file in the transaction log to view the information that way as well. From this output, we can see that the minimum values and maximum values were captured in the last file, along with the number of nulls, or null count. Statistics were collected on all columns in the table because it contains fewer than 32. This metadata is collected for every file added during the operation. If the table contains more than 32 columns, we can also change the number of columns that statistics are collected for using the table property delta.dataSkipping NumlndexedCols: %sql ALTER TABLE table_name SET TBLPROPERTIES ('delta.dataSkippingNumlndexedCols' = ''); 8 Delta Lake properties such as delta.dataSkippingNumlndexed Cols can also be set using the Spark configuration settings. It may not be effective to collect minimum and maximum values on some col umns because collecting statistics on long values like strings or binary can be an expensive operation. We can either configure the table property delta.dataSkipping NumlndexedCols to avoid columns containing long values or move columns contain ing long values to a column greater than delta.dataSkippingNumlndexedCols using ALTER TABLE ALTER COLUMN.2 Chapter 7 discusses updating a tables schema and changing ordering in more detail. Partitioning In an effort to further reduce the amount of data that needs to be read during operations (i.e., data skipping) and to increase performance on large tables, Delta Lake partitioning allows you to organize a Delta table by dividing the data into smaller chunks called partitions. 2 ALTER TABLE Delta Lake documentation 102 | Chapter 5: Performance Tuning K The partitioning described in this section does not describe the partitioning that Spark applies when processing a DataFrame. Rather, the partitioning in this chapter is referring to on-disk, or Hive-style, partitioning where data is organized with paths that contain key value pairs such as Year=2023. You can create partitions based on the values in one or more columns of the table (the most common being date) which can speed up queries against the table, as well as with data manipulation commands such as INSERT, UPDATE, MERGE, and DELETE. 8 At the time of writing, partitions are the recommended approach to enable data skipping in regard to data layout. A new feature in Delta Lake called liquid clustering, which you will learn about in the last section of this chapter, is currently in preview and is not compatible with partitions. This will replace partitions as the recommended approach to optimize query performance in regard to data layout. We felt it was important to understand how parti tions work and how you can apply them manually before learning about features that automate and replace these commands. The new feature, liquid clustering, will be generally available in the near future. You can learn more and stay up-to-date on the status of liquid clustering by reviewing the Delta Lake documentation website and this feature request. When you partition a table, the underlying dataset is organized into different directo ries and subdirectories for each partition (Figure 5-2). Nonpartitioned Delta table r Figure 5-2. The underlying data files of a Delta table organized into different directories and subdirectories Partitioning | 103 The numbered steps in Figure 5-2 show: 1. A Delta table with no partitions is organized into a single directory. 2. A Delta table partitioned on a single column has a directory created for each of the partition values. 3. A Delta table partitioned on multiple columns has a directory created for each partition value, and then subdirectories are created for each of the additional columns defined for the partition. In Delta Lake, partitions run the risk of decreasing performance in many cases, as opposed to not partitioning a Delta table. This is because partitions can create the small file problem, discussed earlier in this book and later on in this chapter, especially when partitioning on multiple columns. Partitioning is seldom advisable; please refer to “Partitioning Warnings and Considerations” on page 108 before applying partitions to Delta tables. When you can selectively query a partition, rather than scanning all of the files in the dataset, Delta Lake quickly scans the appropriate directory (or directories), or partitions, to perform your operation, which results in faster operations. Delta Lake automatically tracks the sets of partitions present in a table and updates the list as data is added or removed, so there is no need to run ALTER TABLE to account for new partitions. To create a partitioned table, we can use the PARTITIONED BY clause in the table definition using SQL: %sql --create partitioned table using SQL CREATE TABLE tripData(PickupMonth INTEGER, VendorlD INTEGER, TotalAmount DOUBLE) PARTITIONED BY(PickupMonth) — use the PARTITION specification to INSERT into a table INSERT INTO tripData PARTITION(PickupMonth= '12') (Vendorld, TotalAmount) SELECT Vendorld, TotalAmount FROM decemberTripData; -- drop partitions ALTER TABLE student DROP PARTITION(PickupMonth = '12'); 104 | Chapter 5: Performance Tuning The following script,3 which can be found in the notebook “02 - Partitioning,” dem onstrates how to write a partitioned Delta table from a Parquet file, while also adding a column to partition on: # import modules from pyspark.sql.functions import (month, to_date) ## UPDATE destination_path WITH YOUR PATH ## # define Delta table destination path destination_path = '/mnt/datalake/book/chapter05/YellowTaxisPartionedDelta/' # read the Delta table, add columns to partition on, # and write it using a partition # make sure to overwrite the existing schema if the table already exists # since we are adding partitions spark.table('taxidb.tripData') \.withColumn('PickupMonth', month('PickupDate')) \.withColumn('PickupDate', to_date('PickupDate1)) \.write \.partitionBy('PickupMonth') \.format("delta") \.option("overwriteSchema", "true") \.mode("overwrite") \.save(destination_path) # register table in Hive spark.sql(f 'CREATE TABLE IF NOT EXISTS taxidb.tripDataPartitioned USING DELTA LOCATION '{destination_path}' ) K If you are following along using the GitHub repository, please note that your file locations may differ from the file locations in the notebooks in the repository. Please update them accordingly. To view the partitions of a Delta table, we can use the SHOW PARTITIONS command: %sql --list all partitions for the table SHOW PARTITIONS taxidb.tripDataPartitioned 3 GitHub repo location: /chapter05/01 - Compaction Partitioning | 105 This produces the following output: +........................+ | PickUpMonth | +...................... + 1 2... 12 I I I I I | I I +...................... + In this output we can see that the Delta table is partitioned by PickUpMonth, and there is a partition for each month. K To overwrite the schema or change partitioning on an existing Delta table, set. option( "overwriteSchema", "true"). To view how the partitions are organized in the underlying filesystem, we can also view the directories created where the Delta table is located. Keep in mind that since we are looking at the actual file system, this could also show you old or nonexistent partitions. Since this table was created using the scripts in the chapter, it should only have the relevant partitions: # import OS module import os # create an environment variable so we can use this variable in the # following bash script os.environf'destination_path'] = '/dbfs' + destination_path # list files and directories in directory print(os.listdir(os.getenv('destination_path'))) This produces the following output: ['PickupMonth=l1,'PickupMonth=10','PickupMonth=ll','PickupMonth=12', 'PickupMonth=2', 'PickupMonth=31,'PickupMonth=4','PickupMonth=5', 'PickupMonth=6','PickupMonth=71, 'PickupMonth=8','PickupMonth=91, '_delta_log'] In this output, the Delta table not only contains the transaction log directory, _delta_logy but also a directory for each of the values in the partition, in this case months 1 to 12. The values of the directories for each partition are also contained in the transaction log as metadata entries that are part of each add file action. This metadata entry can 106 | Chapter 5: Performance Tuning be seen in your current table when looking at the add file action in the transaction log and looking at partitionvalues: %sh # find the last transaction entry and search for "add" to find an added file # the output will show you partitionvalues grep "\"add"\" "$(ls -lrt $destination_path/_delta_log/*.json | tail -nl)" | sed -n lp > /trnp/commit.json | sed -n lp > /tmp/commit.json python -m json.tool < /tmp/commit.json This produces the following output (only showing relevant portions): { "add": { "path": "PickupMonth=12/part-00000-....cO00.snappy.parquet", "partitionvalues": { "PickupMonth": "12" } Due to this metadata, partitioning is essentially the same thing as data skipping. But rather than basing data skipping on data statistics, a topic you will learn more about later on in this chapter, the data skipping is based on exact matches of a string, the partition value, which helps filter files. Delta Lake also makes it easy to update only specified partitions using replaceWhere, which you learned about in Chapter 3. Let’s assume that we have a business require ment that says when the payment type is 4 in the month of December, it needs to be updated to 5. We can use the following PySpark expression with replaceWhere to achieve that result: # import month from SQL functions from pyspark.sql.functions import lit from pyspark.sql.types import LongType # use replaceWhere to update a specified partition spark.read ,format("delta").load(destination_path).where("PickupMonth == '12' and PaymentType == '3' ").withColumn("PaymentType", lit(4).cast(LongType())).write.format("delta").option("replaceWhere", "PickupMonth = '12'").mode("overwrite").save(destination_path) 8 \ \ \ \ \ \ \ \ \ The Delta table schema cannot be overwritten when using replace Where. Partitioning | 107 In the preceding command, notice that we loaded just a single partition using a WHERE clause. Reading partitions directly is not necessary, but using a WHERE clause (Spark SQL) or a.where() function (DataFrame API) enables data skipping, such as: # read a partition from the Delta table into a DataFrame df = spark.read.table("" ).where("PickupMonth = '12'" While using.where() for reading data can be very effective, you can also use.where() in combination with performance tuning commands, such as compac tion, OPTIMIZE, and ZORDER BY, to perform those operations only on a specified partition(s). This can be especially helpful when you are writing new data to specific partitions (e.g., inserting data for the current month). If the WHERE clause or.where() function is not used, then the entire table is scanned by default. For example, we can perform compaction on a single partition: # read a partition from the Delta table and repartition it spark.read.format("delta") \.load(destination_path) \.where("PickupMonth = '12' ") \. repartition^) \.write \.option("dataChange", "false") \.format("delta") \.mode("overwrite") \.save(destination_path) We can also perform OPTIMIZE and ZORDER BY on specified partitions easily using SQL: %sql OPTIMIZE taxidb.tripData WHERE PickupMonth = 12 ZORDER BY tpep_pickup_datetime Partitioning Warnings and Considerations Partitions can be very beneficial, especially for very large tables, but there are a few things to consider when partitioning tables: Select your partition column(s) carefully. If the cardinality of a column is very high, do not use that column for partitioning. For example, partitioning by a column timestamp that may have one million distinct timestamps is a bad partitioning strategy. High cardinality columns are great for Z-ordering, but not partitioning, because it can lead to the same small file problem discussed at the beginning of the chapter. This is why we added date columns in the earlier examples—they help serve as appropriate partitioning columns. The most commonly used partition column is typically a date. 108 | Chapter 5: Performance Tuning You can partition by a column if you expect data in that partition to be at least 1 GB. Tables with fewer, larger partitions tend to outperform tables with many smaller partitions, otherwise you run into the small file problem. Columns used for partitioning are always moved to the end of the table unless the partition columns are explicitly defined in the column specification (the name and data type for each column) when creating the table. Once you create a table with partitions, you cannot change those partitions even as query patterns or partition requirements change. Partitions are considered a fixed data layout and do not support partition evolution. There is no magic recipe for partitioning strategies—simply guidelines to con sider. It depends on the data, granularity, ingestion and update pattern, etc. Compact Files When performing DML operations on a Delta table, often new data is written in many small files across partitions. Due to the additional volume of file metadata and the total number of data files that need to be read, queries and operation speed can be reduced. As an important reminder, this is the small file problem mentioned previously. To avoid this issue, you should rewrite a large number of small files into a small number of larger files greater than 16 MB. Delta Lake supports the ability to optimize this layout of data in storage with various ways to coalesce small files into larger ones. Compaction The consolidation of files is called compaction, or bin-packing. To perform compac tion using your own specifications, for example, specifying the number of files to compact the Delta table into, you can use a DataFrame writer with dataChange = false. This indicates that the operation does not change the data; it simply rearranges the data layout. When data is compacted, Delta Lake sets dataChange = true by default. This can break concurrent operations on the table, such as downstream streaming consumers, when the table is used as a streaming source. Conversely, when using dataChange = false, an operation that changes data can corrupt the underlying data in the table. It is best to only use dataChange = false when there are no data changes, as this option lets any downstream consumers know that the operation only rearranges the data and thus those consumers can ignore the event in the transaction log. Compact Files | 109 The following example can be found in step 1 in the “03 - Compaction, Optimize and ZOrder” notebook. The script in the notebook demonstrates how we can use the DataFrame writer with repartition, a method used to increase or decrease the number of partitions in a Spark DataFrame, and the option dataChange = False to compact the data into five files using our own algorithm: # define the path and number of files to repartition path = "/nnt/datalake/book/chapter05/YellowTaxisDelta" numberOfFiles = 5 # read the Delta table and repartition it spark.read \.format("delta") \.load(path) \.repartition(numberOfFiles) \.write \.option("dataChange","false") \.format("delta") \.mode("overwrite") \.save(path) OPTIMIZE Compaction allows you to specify how to consolidate small files into larger ones. In Delta Lake, a more optimal way to trigger this compaction and let Delta Lake determine the optimal number of large files you want is with the OPTIMIZE command. The OPTIMIZE command aims to remove unnecessary files from the transaction log while also producing evenly balanced data files in terms of file size. The smaller files are compacted into new, larger files up to 1 GB. Figure 5-3 shows how OPTIMIZE consolidates smaller files into larger files. Keep in mind, OPTIMIZE does not take into account how the data is organized within the files; it only rearranges and consolidates files. You will learn more about how to organize data within the files in the next section. As you can see in Figure 5-3: 1. The Delta table is comprised of small files containing data with no particular order. In this case there are four files with two rows each. 2. You run OPTIMIZE to reduce the number of files that need to be read during operations. 3. The small files in your Delta table are compacted into new, larger files up to 1 GB. In this case, we have two files with four rows each. 110 | Chapter 5: Performance Tuning 1 F PickupDate tota l_a mount 2022-06-30 10.00 2022-03-15 50.00 1 r PickupDate tota l_a mount 2022-03-31 40.00 2022-01-01 40.00 1 r PickupDate tota l_a mount 2022-12 31 20.00 2022-09-30 50.00 PickupDate total_amount1 2022-12-01 30.00 2022-07-15 60.00 Figure 5-3. Datafiles before and after OPTIMIZE 8 Unlike compaction achieved through the repartition method, there is no need to specify the dataChange option. OPTIMIZE uses snapshot isolation when performing the command so con current operations and downstream streaming consumers remain uninterrupted. Lets walk through an example of OPTIMIZE. Using the notebook “03 - Compaction, Optimize and ZOrder,” we will execute step 2 (step 1 was executed in the compaction section) to repartition the existing table into 1,000 files to simulate a scenario where we consistently insert data into a table.4 In step 3 in the notebook, run the OPTIMIZE command. The output will provide metrics of the operation. %sql OPTIMIZE taxidb.YellowTaxis 4 GitHub repo location: /chapter05/03 - Compaction, Optimize and ZOrder Compact Files | 111 Output (only relevant portions shown): +................................................................................................... + | metrics | +....................................................................................................................... + | {"numFiiesAdded": 9, "numFilesRemoved": 1000 | | "filesAdded":{...”totalFUes”: 9, | | "totalSize”: 2096274374... | | "fUesRemoved"..”totalFiles”: 1000, "totalsize”: 2317072851 | + + After running the OPTIMIZE command on the table, we can see that 1,000 files were removed and 9 files were added. K When comparing the total size of files removed to files added, you will notice that the size of the file stayed relatively the same, even increasing slightly. The NYC Taxi dataset we are using is mainly integers, so you see little compression from organizing the data. If your data contains many string values, you will see much better compression after running OPTIMIZE. It is important to note that the 1,000 files that were removed were not physically removed from the underlying storage; rather, they were only logically removed from the transaction log. These files will be physically removed from the underlying stor age next time you run VACUUM, which is covered in detail in Chapter 6. Optimization using OPTIMIZE is also idempotent, meaning that if it is run twice on the same table or subset of data, the second run has no effect. If you run the same command again on the taxidb.YellowTaxis table, the data skipping statistics, which you will learn more about later in this chapter, indicate that 0 files were added and 0 files were removed: %sql OPTIMIZE taxidb.YellowTaxis Output (only relevant portions shown): +----------------------------------------------------------------------------------------- + | metrics | + + | {"numFiiesAdded": 0, "numFilesRemoved": 0 "filesAdded": | | {...”totalFiles”: 0, "totalsize”: 0... | + --------------------------------------------------------- + We can also optimize on specific subsets of data rather than optimizing the entire table. This can be useful when we are only performing DML operations on a specific partition (you will learn more about partitions later in this chapter) and need to optimize just that partition(s). We can specify an optional partition predicate using a WHERE clause. Suppose we are only adding and updating data on a regular basis in the 112 | Chapter 5: Performance Tuning partition for the current month; in this case, that is month 12. After adding 12 to the partition predicate, you will notice that after running the following command, only 17 files were removed and 4 files were added in the specified partition: %sql OPTIMIZE taxidb.YellowTaxis WHERE PickupMonth = 12 Output (only relevant portions shown): +......................................................................................+ 1 metrics 1 1 {"numFiiesAdded":.."totalFiles”: 1 + 4, "numFilesRemoved": 17 "fiiesAdded": 4, “totaiSize”:1020557526 1 1 -+ OPTIMIZE considerations While OPTIMIZE can help improve the speed of queries, there are a few things to consider before running this command on all tables to help ensure its effectiveness: The OPTIMIZE command is effective for tables, or table partitions, that you write data continuously to and thus contain large amounts of small files. The OPTIMIZE command is not effective for tables with static data or tables where data is rarely updated because there are few small files to coalesce into larger files. The OPTIMIZE command can be a resource-intensive operation that takes time to execute. You can incur costs from your cloud provider while running your compute engine to perform the operation. Balancing these resource-intensive operations with the ideal query performance for your tables is important. ZORDER BY While OPTIMIZE aims to consolidate files, Z-ordering allows us to read the data in those files more efficiently by optimizing the data layout. ZORDER BY is a parameter of the command and refers to the way that data is arranged in files based on their values. Specifically, this technique clusters and colocates related information in the same set of files to allow for faster data retrieval. This colocality is automatically used by Delta Lake in data-skipping algorithms, which you will learn more about in the next section of this chapter. Z-order indexes can improve the performance of queries that filter on the specified Z-order columns. Performance is improved because it allows queries to more effi ciently locate the relevant rows, and it also allows joins to more efficiently locate rows with matching values. This efficiency can ultimately be attributed to the reduction in the amount of data that needs to be read during queries. ZORDER BY | 113 K Similar to partitions, Z-order indexes will soon be replaced by the new Delta Lake feature, liquid clustering, as the preferred techni que to simplify data layout and optimize query performance. Delta Lake liquid clustering is not compatible with user-specified table partitions or Z-ordering. This feature is currently in preview and will be generally available in the near future. You can learn more and stay up-to-date on the status of liquid clustering by reviewing the Delta Lake documentation website and this feature request. To demonstrate OPTIMIZE combined with Z-ordering, we will reset and clear the optimization we did on taxidb.YellowTaxis earlier in the chapter by repartitioning the existing table into 1,000 smaller files once again by running step 6: # define the path and number of files to repartition path = "/mnt/datalake/book/chapter05/YellowTaxisDelta" numberOfFiles = 1000 # read the Delta table and repartition it spark.read.format("delta").load(path).repartition(numberOfFiles).write.option("dataChange","false").format("delta").mode("overwrite").save(path) \ \ \ \ \ To get a baseline for the initial query, execute the baseline query in the script: %sql -- baseline query -- take note how long it takes to return results SELECT COUNT(*) as count, SUM(total_amount) as totalAmount, PickupDate FROM taxidb.tripData WHERE PickupDate BETWEEN '2022-01-01' AND '2022-03-31' GROUP BY PickupDate 114 | Chapter 5: Performance Tuning This query will give us a general baseline for how long the execution will take when the underlying Delta table has many small files and the data is not organized in any particular order. We can apply ZORDER BY with the OPTIMIZE command to consolidate files and effectively order the data in those files. This will, in turn, significantly decrease the time it takes to fetch the query results since the data is easier to locate. This is generally most effective when used on a high-cardinality column and a column used frequently in query predicates, which means that the column that we apply Z-ordering to impacts how well the data is retrieved: %sql OPTIMIZE taxidb.tripData ZORDER BY PickupDate Now that we added Z-ordering, we can see the detailed zOrderStats highlighted in the output, which includes the strategy name, input cube files, and other statistics about the ZORDER BY operation. When we run the same baseline query that was executed before the OPTIMIZE and ZORDER BY command, we should notice a significant increase in the time it takes to retrieve the query results. The time it takes to retrieve the results will vary depending on the cluster configuration, but we consistently noticed around a 70% decrease in time it took to return the query results due to the optimizations. In this case, adding Z-ordering increased the query engine’s efficiency in reading the data, and OPTIMIZE coalesced the small files into larger ones. This can be difficult to show using large datasets, but Figure 5-4 illustrates how consolidation and ordering occur using the taxidb.YellowTaxis table. ZORDER BY | 115 A Query SELECT COUNT!*) as count FROM taxidb.tripData Where PickupDate = '2023-06-31' "S Q_____ 1 PickupDate total-amount 2022-06-30 10.00 2022-03-15 50.00 -------------- 1 PickupDate totafamount 2022-03-31 40.00 2022-01-01 40.00 File statistics numRecords: 3 minValues: 2022-01-01 maxValues: 2022-03-31' K PickupDate totafamount 2022-12-31 20.00 2022-09-30 50.00 F /I rE> Dat a read k XT Figure 5-4. taxidb. trtpData Delta table files before and after OPTIMIZE and ZORDER BY, along with how data is retrieved using data skipping The numbered steps in Figure 5-4 show: 1. Query the Delta table called taxiDb.YeliowTaxis and count the number of records WHERE PickupDate = '2022-06-30'. 2. The Delta table is comprised of small files containing data with no particular order. 3. We run OPTIMIZE with ZORDER executions. BY for better performance during query 4. Small files are coalesced into larger ones, and the data is sorted by the Z-order column, which is PickupDate. 116 | Chapter 5: Performance Tuning 5. Data skipping is leveraged since we are looking for the query predicate Pickup Date = ' 2022-06-301. The first file is skipped because Delta Lake knows that the query predicate is not contained in this file since it falls outside the range of the min and max values in the data skipping statistics. 6. Data is quickly read from the second file because Delta Lake knows to scan this file since the search predicate falls inside the range of the min and max values. You can see that before we ran any optimization on the table, the data was organized into smaller files with no order to the data. When running the baseline query, the query engine had to scan all the Delta Lake files to find our query predicate WHERE PickupDate BETWEEN '2022-01-01' AND '2022-03-31'. Once we applied OPTIMIZE with ZORDER BY, the data was coalesced into larger files, and the data was sorted by the column PickupDate in ascending order. This allowed the query engine to read the first file based on the query predicate, and ignore, or skip, the second file to gather the results. ZORDER BY Considerations You can specify multiple columns for ZORDER BY as a comma-separated list in the command. However, the effectiveness of the locality drops with each additional column: %sql OPTIMIZE taxidb.tripData ZORDER BY PickupDate, Vendorld Similar to OPTIMIZE, you can apply Z-ordering to specific subsets of data, such as partitions, rather than applying it to the entire table: %sql OPTIMIZE taxidb.tripData ZORDER BY PickupDate, Vendorld WHERE PickupMonth = 2022 8 You cannot use ZORDER BY on fields used for partitioning. You have learned that ZORDER BY works in tandem with the OPTIMIZE com mand. However, you cannot combine files across partition bound aries, so Z-order clustering can only occur within a partition. For unpartitioned tables, files can be combined across the entire table. If you expect a column to be commonly used in query predicates, and if that column has high cardinality (that is, a large number of distinct values), then use ZORDER BY. ZORDER BY | 117 Unlike OPTIMIZE, Z-ordering is not idempotent but aims to be an incremental opera tion. The time it takes for Z-ordering is not guaranteed to reduce over multiple runs. However, if no new data was added to a partition that was just Z-ordered, another Z-ordering of that partition will not have any effect.5 Liquid Clustering Liquid clustering is a new feature in Delta Lake that is currently in preview at the time of writing. This feature will be generally available in the near future. You can learn more and stay up-to-date on the status of liquid clustering by reviewing the Delta Lake documentation website and this feature request. While some of the performance tuning techniques mentioned throughout this chap ter aim to optimize data layouts and thus improve read and write performance, there are some shortcomings: Partitioning Partitions run the risk of introducing the small file problem, where data is stored across many different small files, which inevitably results in poor performance. And once a table is partitioned, this partition cannot be changed and can cause challenges for new use cases or new query patterns. While Delta Lake supports partitioning, there are challenges with partition evolution, as partitioning is considered a fixed data layout. ZORDER BY Anytime data is inserted, updated, or deleted on a table, OPTIMIZE ZORDER BY must be run again for optimization. And when ZORDER BY is applied again, the user must remember the columns used in the expression. This is because the columns used in ZORDER BY are not persisted and can cause errors or challenges when attempting to apply it again. Since OPTIMIZE ZORDER BY is not idempotent, this will result in reclustering data when it is run. Many of the shortcomings with partitioning and Z-ordering can be addressed through Delta Lake’s liquid clustering feature. The following scenarios for Delta tables benefit greatly from liquid clustering Tables often filtered by high cardinality columns Tables with substantial skew in data distribution Tables that require large amounts of tuning and maintenance 5 See “Optimizations” in the Delta Lake documentation. 118 | Chapter 5: Performance Tuning Tables with concurrent write requirements Tables with partition patterns that change over time Delta Lake’s liquid clustering feature aims to address limitations found with partition ing and Z-ordering, and revamp both read and write performance through a more dynamic data layout. Ultimately, liquid clustering helps reduce performance tuning overhead while also supporting efficient query access. Enabling Liquid Clustering To enable liquid clustering on a table, you can specify the CLUSTER BY command when creating a table. You must specify liquid clustering using the CLUSTER BY command when you create the table; you cannot add clustering to an existing table (e.g., using ALTER TABLE) that does not have liquid clustering enabled. If you are using Databricks to follow along in this book and run the notebooks from the GitHub repo, Databricks Runtime 13.2 and above is required to run the code related to liquid clustering. To demonstrate how to create a table with liquid clustering enabled, we can use the notebook “04 - Liquid Clustering” and the following command: %sql CREATE EXTERNAL TABLE taxidb.tripDataClustered CLUSTER BY (Vendorld) LOCATION '/mnt/datalake/book/chapter05/YellowTaxisLiquidClusteringDelta ' AS SELECT * FROM taxiDb.tripData LIMIT 1000; The preceding command creates an external table with liquid clustering enabled, clustered by Vendorld, and populated with data from the taxiDb. tripData table that was created earlier. To trigger clustering, run the OPTIMIZE command on the newly created table: %sql OPTIMIZE taxidb.tripDataClustered; Output (only relevant portions shown): + --------------------------------------------------------------------------- + | metrics | + --------------------------------------------------------------------------- + | {"sizeOfTablelnBytesBeforeLazyClustering": 43427, "isNewMetadataCreated": | | | true..."numFilesClassifiedToLeafNodes": 1, | "sizeOfFilesClassifiedToLeafNodesInBytes": 43427, | | "logicalSizeOfFilesClassifiedToLeafNodesInBytes": 43427, | | "numClusteringTasksPlanned": 0, "numCompactionTasksPlanned": 0, | | "numOptimizeBatchesPlanned": 0, "numLeafNodesExpanded": 0, | Liquid Clustering | 119 | "numLeafNodesClustered": 0, "numLeafNodesCompacted": 0, | | "numlntermediateNodesCompacted": 0, "totalSizeOfDataToCompactlnBytes": 0, | | "totalLogicalSizeOfDataToCompactlnBytes": 0, | | "numlntermediateNodesClustered": 0, "numFilesSkippedAfterExpansion": 0, | | "totalSizeOfFilesSkippedAfterExpansionlnBytes": 0, | | "totalLogicalSizeOfFilesSkippedAfterExpansionlnBytes" : 0, | | "totalSizeOfDataToRewritelnBytes": 0, | | "totalLogicalSizeOfDataToRewr-ltelnBytes": 0... | +..................................................................................................................................................... + Earlier in the chapter you saw the metrics displayed after running the OPTIMIZE command on a table. In the output of the OPTIMIZE command for a table with liquid clustering enabled, you will see clusterMetrics are now included in the metrics of the output. These clusterMetrics display detailed information about the underlying data files (e.g., size and number), compaction details, and cluster node information so that you can view the results of clustering. It is important to note that only a few operations automatically cluster data on write when writing data to a table with liquid clustering. The following operations support automatically clustering data on write, provided the size of the data being inserted does not exceed 512 GB:. INSERT INTO. CREATE TABLE AS SELECT (CTAS) statements. COPY INTO Write appends such as spark.write.format("delta").mode("append") Since only these specific operations support clustering data on write, you should trigger clustering on a regular basis by running OPTIMIZE. Running this command frequently will ensure that data is properly clustered. It is also worth noting that liquid clustering is incremental when triggered by OPTIMIZE, meaning that only the necessary data is rewritten to accommodate data that needs to be clustered. Since not all write operations automatically cluster data, and since OPTIMIZE is an incremental operation, it is recommended to regularly schedule OPTIMIZE jobs to cluster data, especially since this incremental process helps these jobs run quickly. Operations on Clustered Columns By enabling liquid clustering, you learned how to specify what columns a table is clustered on by using the CLUSTER BY command. Once a table is clustered by particular columns, you can read data more efficiently by leveraging the clustered columns, while also being able to view, change, and remove those columns. 120 | Chapter 5: Performance Tuning Changing clustered columns While you must specify how a table is clustered when it is initially created, you can still change the columns used for clustering on the table using ALTER TABLE and CLUSTER BY. To change the cluster columns of the table that we created earlier to be clustered on both Vendorld and RateCodeld, run the following command: %sql ALTER TABLE taxidb.tripDataClustered CLUSTER BY (Vendorld, RateCodeld); K You can specify up to four columns as clustering keys. When changing clustered columns, liquid clustering does not require the entire table to be rewritten. This clustering evolution is due to the dynamic data layout feature of liquid clustering and offers a significant advantage over partition features mentioned earlier in the chapter. Traditional partitioning is a fixed data layout and does not support changing how a table is partitioned without having to rewrite the entire table. This clustering evolution can be essential as query patterns for a table can often change over time, and this allows you to dynamically adapt to new query patterns without any significant overhead or challenges. Viewing clustered columns Now that we have changed how the table is clustered, we can view the table metadata using DESCRIBE TABLE to confirm these changes and see the clustered columns: %sql DESCRIBE TABLE taxidb.tripDataClustered; Output (only relevant portions shown): +.................................................... + + + | col_name | data_type | comment | +.................................................... + + + | # Clustering Information | | | + --------------------------- +----------------- + + | # col_name | data_type | comment | + --------------------------- +----------------- + --------- + | Vendorld | bigint | null | +.................................................... + + + | RateCodeld | double | null | +.................................................... + + + Liquid Clustering | 121 The DESCRIBE TABLE command returns basic metadata information about the table, which shows the cluster information and that the table is now clustered on both Vendorld and RateCodeld. Reading data from a clustered table Now that we have confirmed the clustered columns, we can specify the clustered columns in query filters (e.g., WHERE clause) to achieve the best (i.e., fastest) query results. For example, add Vendorld and RateCodeld to a WHERE clause on the taxidb. tripDataClustered table to achieve the best query results: %sql SELECT * FROM taxidb.tripDataClustered WHERE Vendorld = 1 and RateCodeld = 1 Removing clustered columns If we choose to remove the columns that a table is clustered by, we can simply specify CLUSTER BY NONE: %sql ALTER TABLE taxidb.tripDataClustered CLUSTER BY NONE; Liquid Clustering Warnings and Considerations Given that liquid clustering is currently in preview at the time of writing, there are several factors to take into account prior to enabling and utilizing liquid clustering: Check your environment runtime to ensure it supports OPTIMIZE on Delta tables with liquid clustering enabled. If you are using Databricks to follow along in this book and run the notebooks from the GitHub repo, Databricks Runtime 13.2 and above is required. Tables created with liquid clustering enabled have numerous Delta table features enabled at creation and use Delta version 7 and reader version 3. Table protocol versions cannot be downgraded, and tables with clustering enabled are not read able by Delta Lake clients that do not support all enabled Delta reader protocol table features. You must enable Delta Lake liquid clustering when first creating a table. You cannot alter an existing table to add clustering without clustering being enabled when the table is first created. You can only specify columns with statistics collected for clustered columns. Remember, only the first 32 columns in a Delta table have statistics collected by default. 122 | Chapter 5: Performance Tuning Structured Streaming workloads do not support clustering-on-write. Run OPTIMIZE frequently to ensure new data is clustered. Liquid clustering is not compatible with partitioning or ZORDER BY. Conclusion In this chapter, you learned about different techniques to store and organize data, both physically and dynamically, and the significant effects that it can have on how data is read and retrieved during operations. As the types of data points captured continue to grow, along with the sheer volume of data, tables will continue to get larger and larger. Performance tuning on large datasets has been, and always will be, considered a good strategy and best practice. Understanding the Delta Lake features that enable this will help significantly reduce overhead. We discussed the small data files problem, the impact that it can have on perfor mance, and how it can be solved using compaction strategies, including optimal file consolidation using OPTIMIZE. After you OPTIMIZE a table’s files, you can arrange the values within those files using ZORDER BY, which allows you to leverage data skipping more effectively through data skipping statistics. You can take data skipping one step further by partitioning Delta tables and breaking the data into distinct parts to further reduce the amount of data that needs to be read. Then we looked at new Delta Lake features that address some of the challenges that partitioning and Z-ordering can still have. Liquid clustering offers clustering evolution through dynamic data layouts that don’t require you to rewrite an entire table as query patterns evolve over time. This largely automated feature is not com patible with partitioning and Z-ordering, but requires less tuning effort compared to other performance optimization features, and greatly enhances the read and write performance of tables. Using the Delta Lake features mentioned in this chapter can reduce the amount of nonrelevant data that needs to be read, and improve performance, especially as the number of data files in your Delta table grows. In the next chapter, you will learn how Delta Lake leverages old data files to allow you to version your data and travel back to a certain point in time. Conclusion | 123