(Delta) Ch 6 Using Time Travel.pdf
Document Details
Uploaded by EnrapturedElf
Tags
Full Transcript
CHAPTER 6 Using Time Travel Having worked with databases and tables before, odds are you have had that imme diate sense of panic when you forgot a WHERE clause and accidenta...
CHAPTER 6 Using Time Travel Having worked with databases and tables before, odds are you have had that imme diate sense of panic when you forgot a WHERE clause and accidentally ran a DELETE or UPDATE statement against an entire table. We have all been there. Or you may have wondered what your data or schema looked like at a specific point in time for auditing, tracking, or analysis purposes. Given how data is constantly changing, the following scenarios are common occur rences that, historically, have been difficult to solve or answer: Regulatory Auditing and regulatory compliance can require that data be stored and retrieved for many years or can require that you track certain changes to your data (e.g., GDPR). Reproduce experiments and reports There are often requirements for data scientists or analysts to re-create reports or machine learning experiments and model outputs given a specific set of data at a specific point in time. Rollbacks Accidental or bad DML operations on your data, such as INSERT, UPDATE, DELETE, and MERGE, can require fixes and rollbacks to a previous state. Time-series analysis Reporting needs can require you to look back or analyze data over time, for example, how many new customers were added over the course of a month. Debugging Troubleshooting ETL pipelines, data quality issues, or broken processes where the specific cause may only be observable in a historical state. 125 The ability to easily traverse through different versions of data at specific points in time is a key feature in Delta Lake called Delta Lake time travel. You learned in Chapter 2 that the transaction log automatically versions data in a Delta table, and this versioning helps you access any historical version of that data. Through versioning and data retention, you will learn how to use these powerful Delta Lake features while also leveraging data management and storage optimization. Delta Lake Time Travel Delta Lake time travel allows you to access and revert to previous versions of data stored in Delta Lake, easily providing a powerful mechanism for version control, auditing, and data management. You can then track changes over time and roll back to previous versions if needed. Let’s walk through an example. First, execute the “Chapter Initialization” notebook for Chapter 61 to create the taxidb.tripData Delta table. Next, open the “01 - Time Travel” notebook for Chapter 6. Let’s assume that we need to update Vendorld from 1 to 10. Then, we need to delete all occurrences WHERE Vendorld = 2. Using the scripts in the notebook for “01 - Time Travel,” execute the following command to apply those changes: %sql --update records in table UPDATE taxidb.tripData SET Vendorld = 10 WHERE Vendorld = 1; --delete records in table DELETE FROM taxidb.tripData WHERE Vendorld = 2; --describe the table history DESCRIBE HISTORY taxidb.tripData; You will see this output (only relevant portions shown): + —+— +..................... + | version | operation | operationparameters I +......... — +............... - - +............................................................................................................. - + 1 2 | DELETE | {"predicate": 1 1 1 | "[\"(spark_catalog.taxidb.tripData.Vendorld = 2L)\"]"} 1 1 1 | UPDATE | {"predicate": "(Vendorld#5081l_ = 1)"} 1 +......... — +............... - - +................................................................................................... -+ 1 0 | WRITE | {"mode": "Overwrite", "partitionBy": "[]"} 1 +......... ----- +................. -- +................................................................................................... -+ 1 GitHub repo location: /chapter06/00 - Chapter Initialization 126 | Chapter 6: Using Time Travel Output continued and modified to show operati.onMetri.es (only relevant portions shown): version | operation 1 operationMetrics 2 | DELETE 1 {"numRemovedFiles": "10", "numCopiedRows": "9643805", 1 1 "numAddedChangeFiles": "0", 1 1..."numDeletedRows": "23360027"..., 1 1 "numAddedFiles": "10"...} 1 | UPDATE 1 {"numRemovedFiles"; "10", "numCopiedRows": "23414817", 1 1 "numAddedChangeFiles": "0","numAddedFiles": "10".} 0 | WRITE 1 {"numFiles": "10", "numOutputRows": "33003832"...} -+ Looking at the output, we can see that there are a total of three versions of the table, one for each commit, with version 2 being the most recent change: Version 0: wrote the initial Delta table using overwrite and no partitions Version 1: updated the Delta table using the predicate Vendorld = 1 Version 2: deleted data from the Delta table using the predicate Vendorld = 2 Notice that the DESCRIBE HISTORY command shows details about the version, transaction timestamp, operation, operation parameters, and operation metrics. The operationMetri.es in the output also shows the number of rows and files changed from the operation. Figure 6-1 illustrates the different versions of the table and examples of the underly ing data. Delta table taxidb.tripData versions Figure 6-1. taxidb.tripData version history Delta Lake Time Travel | 127 Restoring a Table Now, let’s say that we want to roll back the previous UPDATE and DELETE operations we performed on taxidb. tripData and restore it back to its original state (i.e., version 0). We can use the RESTORE command to roll back the table to the desired version: %sql --restore table to previous version RESTORE TABLE taxidb.tripData TO VERSION AS OF 0; --describe the table history DESCRIBE HISTORY taxidb.tripData; Output (only relevant portions shown): version | operation | operationparameters 3 | RESTORE | {"version": "0", "timestamp": null} 2 | DELETE | {"predicate": 1 | "[\"(spark catalog.taxidb.tripData.Vendorld IN 5,6L) 1 1 \"]"} 1 | UPDATE | {"predicate": "(VendorId#5081L = 1)"} 0 | WRITE | {"mode": "Overwrite", "partitionBy": "[]"} After restoring the table to version 0 and running the DESCRIBE HISTORY command, you will see that there is now an additional version of the table, version 3, which captures the RESTORE operation. Figure 6-2 illustrates the different versions of the table and examples of the underlying data. Figure 6-2. taxidb. tripData versions after restoring back to version 0 128 | Chapter 6: Using Time Travel You can now see in the data and Figure 6-2 that the latest version of the table now reflects the data from version 0. You can restore a table that has already been restored. You can also 8 restore a table to a version that was previously restored. When data files are deleted either manually or through VACUUM (you will learn more about VACUUM later in this chapter), restoring a table to a version that references those data files will fail. Use the Spark configuration spark.sql.files.IgnoreMissingFiles = True to partially restore the table. As the name indicates, this Spark configuration will simply ignore missing files when reading data. Restoring via Timestamp In the previous examples we restored the table to a specific version, but we can also restore a table to a specific timestamp. The timestamp format for restoring to an earlier state is yyyy-MM-dd HH:mm:ss. Providing only a date (yyyy-MM-dd) string is also supported. %sql --restore table to a specific timestamp RESTORE TABLE taxidb.tripData TO TIMESTAMP AS OF '2023-01-01 00:00:00'; --restore table to the state it was in an hour ago RESTORE TABLE taxidb.tripData TO TIMESTAMP AS OF current_timestamp() - INTERVAL '1' HOUR; We can also import the delta module to use PySpark and the DataFrame API for restoring a table. We can use the method restoreToVersion(version: int) to restore to a specific version like we did earlier, or we can use the restoreToTimestampftimestamp: str) method to restore to a specified timestamp: --import delta module from delta.tables import * --restore table to a specific timestamp using PySpark deltaTable = DeltaTable.forName(spark, "taxidb.tripData") deltaTable.restoreToTimestamp("2023-01-01") Time Travel Under the Hood Version history can be kept on a Delta table because the transaction log keeps track of which files should or should not be read when performing operations on a table. When the DESCRIBE HISTORY command is executed, it will also return the operationMetri.es, which tells you the number of files added and removed during an operation. When performing an UPDATE, DELETE, or MERGE on a table, that data is not physically removed from the underlying storage. Rather, these operations update the Delta Lake Time Travel | 129 transaction log to indicate which files should or should not be read. Similarly, when you restore a table to a previous version, it does not physically add or remove data; it only updates the metadata in the transaction log to tell it which files to read. In Chapter 2 you learned about JSON files within the _delta_log directory and checkpoint files. Checkpoint files save the state of the entire table at a point in time, and are automatically generated to maintain read performance by combining JSON commits into Parquet files. The checkpoint file and subsequent commits can then be read to get the current state, and previous states in the case of time travel, of the table, avoiding the need to list and reprocess all of the commits. The transaction log commits checkpoint files, and the fact that data files are only logically removed as opposed to being physically removed is the foundation for how Delta Lake easily enables time travel on your Delta table. Figure 6-3 shows the transaction log entries for each of the operations on the taxidb.tripData table throughout the different transactions and versions. Restore taxidb.tripData to Add file C Figure 6-3. taxidb.tripData transaction log files The numbered steps in Figure 6-3 show: 1. Version 0: the initial table is created and files are added in the transaction log. 2. Version 1 and 2: add file and remove file act as metadata entries that Delta Lake uses to determine which files should be read for each version. Remove file does not physically delete data; it just logically removes files from the table. 3. Version 3: this version was restored back to version 0, so the transaction log restores file A, the original file added in version 0, and logically removes file C. If you look at the table history of the taxidb.tripData table that we previously restored to version 0, you will notice the number of restored, added, and removed files captured in the operationMetrics: 130 | Chapter 6: Using Time Travel %sql --describe table history DESCRIBE HISTORY taxidb.tripData Output (only relevant portions shown): + -+ 1 version | operation | operationMetrics 1 1 3 | RESTORE | {"numRestoredFiles": "10"..."numRemovedFiles": 1 1 1 | "10”.."numOfFilesAfterRestore": "10" 1 1 2 | DELETE | {"numRemovedFiles": "10"..."numAddedChangeFiles": 1 1 1 | "0”.,"numAddedFiles": "10" 1 + -+ 1 1 | UPDATE | {"numRemovedFiles": "10"..."numAddedChangeFiles": 1 1 1 | " 0”.."numAdded Files": "10" 1 1 0 | WRITE | {"numFiles": "10", "numOutputRows": "33003832", 1 1 1 | "numOutputBytes": "715810450"} 1 + -+ You will learn more about how to retain and remove previous versions of data in later sections of this chapter. RESTORE Considerations and Warnings It is important to note that RESTORE is a data-changing operation, meaning data Change = true. This means it can potentially affect downstream jobs, such as Struc tured Streaming jobs, which you will learn more about in Chapter 8. Consider a situation where the streaming query only processes updates to a Delta table. If we RESTORE the table to a previous version, then previous updates to the table could be processed again by your streaming job since the transaction log restores previous versions of the data using the add file action with dataChange = true. The streaming job recognizes the records as new data. Table 6-1. Operations resultingfrom RESTORE Table Operation Delta log updates Records in data change log updates version 0 INSERT AddFile(/path/to/file-l, (Vendorld = 1, passenger_count = 2, dataChange = true) (Vendorld = 2, passenger_count = 3) 1 INSERT AddFile(/path/to/file-2, (Vendorld = 2, passenger_count = 4) dataChange = true) 2 OPTIMIZE AddFile(/path/to/file-3, (No records change during OPTIMIZE) dataChange = false), Remove Ftle(/path/to/file-l), Remove File(/path/to/file-2) Delta Lake Time Travel | 131 Tai Operation Delta log updates Records in data change log updates vei 3 RESTORE RemoveFile(/path/to/file-3)/ (Vendorld = 1, passenger_count = 2, AddFi.le(/path/to/file-l, (Vendorld = 2, passenger_count = 3), dataChange = true), Add (Vendorld = 2, passenger_count = 4) File(/path/to/file-2, data Change = true) In Table 6-1, notice that the OPTIMIZE operation removed files related to versions 1 and 2, and added a file for version 3. After running the RESTORE command, the operation added back file 1 and file 2 for their respective versions, which was considered a dataChange operation. Querying an Older Version of a Table By default, whenever you query a Delta table you always query the table’s latest version. But Delta Lake time travel allows you to also perform read operations on a tables previous versions without needing to restore them. Remember, the data itself is not physically deleted from the underlying storage, it is just logically removed. Logical removal rather than physical deletion means that time travel not only allows you to restore a table to a specific point in time, but you can easily query previous versions of a table directly, without restoring. You can access previous versions of the data in two different ways: 1. Using a timestamp 2. Using a version number Similar to how we restored the table to a previous version using the version number, we can also use the version number to query the table at a specific point in time. In the earlier example, we restored the table back to its previous state, but we had deleted records in version 2 using the predicate WHERE Vendorld = 2. We can search for the count of those Vendorld records in version 2 of the table using time travel: %sql --count records where Vendorld = 1 using version number SELECT C0UNT(*) AS count FROM taxtdb.tripData VERSION AS OF 2 WHERE Vendorld = 1; --count records where Vendorld = 1 using operation timestamp SELECT C0UNT(*) AS count FROM taxtdb.tripData VERSION AS OF ’2023-01-01 00:00:00’ WHERE Vendorld = 1; --count records where Vendorld = 1 using operation timestamp and using @ syntax --timestamp must be in yyyyMMddHHmmssSSS format SELECT COUNT(*) AS count FROM taxidb.YeLlowTaxi@20230101000000000 WHERE Vendorld = 1; 132 | Chapter 6: Using Time Travel --count records where Vendorld = 1 using version number and using @ syntax SELECT COUNT(*) AS count FROM taxidb.tripData@v2 WHERE Vendorld = 1; Output: +............ + | count | +---------- + I 0 I +.............+ As seen in the preceding examples, we can access the different versions of the data using different types of syntax; we can either use a timestamp or the version number with the syntax specifying VERSION AS OF or appending after the table name. Time travel is not only accessible via SQL, but we can also time travel via the DataFrame API using the. option () method: # count records where Vendorld = 1 using version number spark.read.option("versionAsOf", "0").table("taxidb.tripData").filter( "Vendorld = 1" ).count() # count records where Vendorld = 1 using timestamp spark.read.option("timestampAsOf", "0").table("taxidb.tripData").filter( "Vendorld = 1" ).count() Querying by timestamp makes it easy to perform time-series analysis because we can compare the data of the same table to itself at two different points in time. And while there are other ETL patterns we can follow to capture historical data and enable time-series analysis (e.g., slowly changing dimensions and change data feeds), time travel provides a quick and easy way to perform ad hoc analysis for tables that may not have these ETL patterns in place. For example, if we wanted to quickly see how many passengers were picked up this week compared to last week using the version history of taxidb. tripData, we could run the following query: %sql --count number of new passengers from 7 days ago SELECT sum(passenger_count) - ( SELECT sum(passenger_count) FROM taxidb.tripData TIMESTAMP AS OF date_sub(current_date(), 7) ) FROM taxidb.tripData Delta Lake Time Travel | 133 While time travel enables time-series analysis as demonstrated, there are more efficient ways to perform similar operations. These time-series examples are illustrated for the purposes of demonstrat ing the capabilities of time travel. Later on in this chapter you will learn about Delta Lakes Change Data Feed, which supports a recommended approach for performing time-series analysis due to its efficiency. Data Retention The data files backing a Delta table are never deleted automatically, but log files are automatically cleaned up after checkpoints are written. Ultimately, what enables time travel to a specific version of a table is the retention of both the data and log files for that version of a table. By default, Delta tables retain the commit history, or log files, for 30 days. So you can access time travel on Delta tables for up to 30 days unless you have modified the data or log files. In this and following sections, you will see the term retention thresholds. The reten tion threshold refers to the interval (e.g., days) a file must be kept before it is a candidate to be physically removed from storage. For example, if the retention threshold for a table is seven days, then a file must be at least seven days older than the current table version before becoming eligible to be removed. The following sections will cover the two types of retention that this book will discuss, data and log file retention. Data File Retention Data file retention refers to how long data files are retained in a Delta table. The default retention is seven days for files that are candidates to be removed by VACUUM, a command used for physically deleting data files. In brief, VACUUM removes data files no longer referenced by the Delta table and older than the retention period. Unless removed manually, data files will only be removed when you run VACUUM. This command does not delete Delta log files, only data files. You will learn more about the VACUUM command and how it works later on in this chapter. Data files commonly need to be retained for longer than the default retention period. The table property delta.deletedFileRetentionDuration - "interval " controls how long ago a file must have been deleted before being a candidate for VACUUM. To retain data files for a certain period of time even if you run VACUUM, use the table property delta.deletedFileRetentionDuration = "interval ". This will control how long ago a file must have been deleted before being a candidate for 134 | Chapter 6: Using Time Travel VACUUM. For example, if you need to retain and access historical data for one year, set delta.deletedFil.eRetenti.onDuration = "interval 365 days". Once you remove a data file(s), you will be unable to time travel to versions of the table that used that data file(s). However, retaining excess data files can cause cloud storage costs to grow over time, along with potential impacts on performance for processing metadata. To demonstrate how data files can grow over time, using DESCRIBE HISTORY on the taxiDb.trtpData table, we can use the metrics numFiles and numAddedFiles in operationMetrics to show how many files were added during each operation: %sql --describe table history DESCRIBE HISTORY taxidb.tripData Output (only relevant portions shown): + -+ 1 version | operation | operationMetrics 1 1 3 | RESTORE | {"numRestoredFiles": "10"..."numRemovedFiles": 1 1 1 | "10"..."num0fFilesAfterRestore": "10" 1 1 2 | DELETE | {"numRemovedFiles": "10"..."numAddedChangeFiles": 1 1 1 | "0”.."numAddedFiles": "10" 1 1 1 | UPDATE | {"numRemovedFiles": "10"..."numAddedChangeFiles": 1 1 1 | "©".."numAddedFiles": "10" 1 1 0 | WRITE | {"numFiles": "10", "numOutputRows": "33003832", 1 1 1 | "numOutputBytes": "715810450") 1 + -+ Based on the numFiles and numAddedFiles metrics, you can see that 30 files have been added to this table. If you have an ETL process that runs each day and performs INSERTS, UPDATES, or DELETES on a single table, then after one year you could have 10,950 (30 x 365) files'. And this is just the number of files for a single table. Imagine the number of files you can have across your entire data platform. The number of files added during each operation obviously depends on the operations performed, number of rows contained during each operation, and other variables, but this helps demonstrate how your data files can grow over time. Fortunately, cloud data lakes are very cost-effective when it comes to storing data, but these costs can grow as your data files do. This is why it is still important to be Data Retention | 135 economical when retaining data files for extended periods of time, and take costs and business requirements into consideration when setting retention periods. Log File Retention Log file retention refers to how long the log files are retained in the Delta table. The default retention is 30 days. You can change how long files are retained using the table property delta.logRetentionDuration. For example, if you need to retain commit history on a table for one year, set delta. logRetentionDuration = "interval 365 days". In Chapter 2 you learned that every 10 commits, a checkpoint is written (at the time of writing; this is subject to change in future versions of Delta Lake). Delta Lake automatically cleans up log files based on the retention interval each time a new checkpoint is generated. K In order to time travel to a specific version of the table, all of the consecutive log entries up until a new checkpoint is written are required. Checkpoints are written every 10 commits, which means that if you want to time travel to version 0-9 of a table, then there must be log entries for all versions 0, 1,2,..., 9. You will be unable to time travel to version 0-9 of the table if the log for version 0 is removed due to Delta automatically cleaning up log entries older than the retention interval. There is minimal downside to retaining log files, as log files do not affect perfor mance on read/writes on the table; they only affect performance on operations that leverage table history. You should always consider storage costs when retaining files, but log files are generally small in nature. K Delta Lake properties such as delta.deletedFileRetentionDura tion and delta.logRetentionDuration can also be set using the Spark configuration properties. Setting File Retention Duration Example Using the taxidb.trtpData table, let’s say, for example, there is a requirement to maintain the entire history of a table for one year for either time-series analysis or regulatory purposes. To ensure that we can time travel to this table at any point in the last year, we can set the following table properties: %sql --set log retention to 365 days 136 | Chapter 6: Using Time Travel ALTER TABLE taxidb.tripData SET TBLPROPERTIES(delta.logRetentionDuration = "interval 365 days"); --set data file retention to 365 days ALTER TABLE taxidb.tripData SET TBLPROPERTIES(delta.deletedFileRetentionDuration = "interval 365 days"); --show the table properties to confirm data and log file retention SHOW TBLPROPERTIES taxidb.tripData; Output (only relevant portions shown): +------------------------------------ + -------------------+ | key | value | +--------------------------------------------------------------- +----------------------------- + | delta.deletedFileRetentionDuration | interval 365 days | + + ------------------- + | delta.logRetentionDuration | interval 365 days | +--------------------------------------------------------------- +----------------------------- + Since delta.deletedFileRetentionDuration and delta.logRetentionDuration are table properties, we can set these properties when we initially create the table, or we can alter the table’s properties after it has been created. You can see in the preceding example that after altering the tables properties and then executing the command SHOW TBLPROPERTIES, it returns the intervals for retention on deleted files and log files for taxidb.tripData. By setting both intervals to 365 days, we can now ensure that we can time travel to this table at any point in time dur ing the last year to satisfy both business requirements and regulatory requirements. Data Archiving In the case of regulatory or archival purposes where you may need to retain data for a certain number of years, storing this data using time travel and file retention can become expensive due to storage costs. To help minimize costs, an alternative solution can be to archive your data in a daily, weekly, or monthly manner by creating a new table using a CREATE TABLE AS SELECT pattern: %sql --archive table by creating or replace CREATE OR REPLACE TABLE archive.tripData USING DELTA AS SELECT * FROM taxidb.tripData Tables created in this way will have independent history compared to the source table; therefore time travel queries on the source table and the new table may return different results based on your archiving frequency. Data Retention | 137 VACUUM In the previous section you learned that you can set retention thresholds and remove data files that have been logically deleted and are no longer referenced by a Delta table. This is a reminder: these data files are never automatically physically deleted from storage unless the VACUUM command is run. VACUUM is designed to allow users to physically delete old versions of data files and directories that are no longer needed, while also taking into account the retention threshold of the table. Physically deleting old versions of data files using VACUUM is important for primarily two reasons: Cost Storing old and unused data files can cause cloud storage costs to grow exponen tially, especially for data that changes often. Minimize these costs by removing unused data files. Regulatory Auditing and regulatory compliance (e.g., GDPR) can require that some records are permanently removed and no longer available. Physically deleting files con taining these records can help satisfy those regulatory requirements. Figure 6-4 illustrates a condensed version of both the log and data files in a Delta table between different versions to show the effects of VACUUM. __ Q__ Delta table version history >7 days older than current version Figure 6-4. Results of running the VACUUM command on a Delta table 138 | Chapter 6: Using Time Travel The numbered steps in the figure show: 1. Version 0 of the table is past the retention threshold (greater than seven days old). This version of the table contains log files, data files that are used in the tables current version, and deleted data files that are no longer used by the current version of the table. 2. The VACUUM command is run on the table. The default retention for deleted data files is seven days. 3. After running the VACUUM command, logically deleted data files from version 0 are physically removed from storage because they were greater than the default deleted file retention period of seven days. Log files were not removed, only deleted data files. Data files still used in the current version of the table were not removed. VACUUM Syntax and Examples When vacuuming a table, you can specify VACUUM without any parameters to vacuum files that are not required by version older than the default retention period. You can also use the RETAIN num HOURS parameter to vacuum files that are not required by versions greater than the number of hours specified in the parameter. To vacuum a table, specify the table name or filepath and then add any additional parameters: %sql --vacuum files not required by versions older than the default retention period VACUUM taxidb.tripData; VACUUM './chapter06/YellowTaxisDelta/'; --vacuum files in path-based table VACUUM delta.‘./chapter06/YellowTaxisDelta/'; --vacuum files not required by versions more than 100 hours old VACUUM delta.'./chapter06/YellowTaxisDelta/' RETAIN 100 HOURS; Before attempting to vacuum the table, we can also run VACUUM using the parameter DRY RUN to view the list of files that are to be deleted before deleting them: %sql VACUUM taxidb.tripData DRY RUN --dry run to get the list of files to be deleted VACUUM | 139 Output (only relevant portions shown): +....................................................................................................... + I path | +.......................................................................................................................................+ | dbfs:/xxx/chapter06/YellowTaxisDelta/part-xxxx.C000.snappy.parquet | | dbfs:/xxx/chapter06/YellowTaxisDelta/part-xxxx.c0O0.snappy.parquet | | dbfs:/xxx/chapter06/YellowTaxisDelta/part-xxxx.c000.snappy.parquet | + + You can see the list of files from the output that will be deleted from the Delta table taxidb. tripData if you were to run VACUUM. VACUUM also commits to the Delta transaction log, which means that you can also view previous VACUUM commits and operationMetrics using DESCRIBE HISTORY: %sql DESCRIBE HISTORY taxidb.tripData --view the previous vacuum commit(s) Output (only showing relevant portions): +-------------------------------- + ------------------------- +--------------------------------------------- + |version| operation | operationparameters | operationMetrics | + -------------------- + ------------------------- + --------------------------- + | x |VACUUM END |{"status": "COMPLETED"} | {"numDeletedFiles": "100" | III | "numVacuumedDirectories": | III I "1”} I +---------- +-------------------- +----------------------------------------- +--------------------------------------------- + | x |VACUUM START|{"retentionCheckEnabled":| | | | |"true"...} | {"numFilesToDelete": | III I "100"} I + + +.....................................................+ In the output, notice that the operationparameters show if retentionCheckEnabled is true or false. You will also notice that the operationMetrics show the number of files that were deleted and the number of directories that were vacuumed. How Often Should You Run VACUUM and Other Maintenance Tasks? It is recommended to regularly run VACUUM on all tables, outside of your main ETL workflow, to reduce excess cloud data storage costs. There is no exact science that indicates exactly how often you should run VACUUM. Rather, your decision on frequency should primarily be based on your budgeted storage costs, and business and regulatory needs. Scheduling a regularly occurring maintenance job to VACUUM your tables is strongly recommended to appropriately satisfy these factors. This maintenance job, which can also include other file cleanup operations such as OPTIMIZE, should be run as a separate workflow outside of your main ETL workflow for several reasons: 140 | Chapter 6: Using Time Travel Resource utilization File cleanup operations can be resource intensive and can compete for resources with your main workflow, leading to a decline in overall performance. Therefore, you should specify maintenance windows outside of off-peak hours. These types of operations also require different cluster sizing recommendations, such as autoscaling, as opposed to regular workflows that typically use a fixed cluster size. You will read more about cluster-sizing recommendations at the end of this chapter. Isolation It is best to isolate processes that perform file cleanup and consolidation so that they have exclusive access to the Delta table to avoid any potential conflicts. Monitoring By isolating these processes, it is much easier to monitor performance so that you can track progress and resource consumption for tuning. Having an isolated process also reduces any debugging complexities when processes run in parallel, and also makes it easier to identify any bottlenecks. By scheduling separate workflows for maintenance tasks such as VACUUM, you can have greater resource management, isolation, monitoring, and overall control of your jobs and workflows. Related to the frequency of your maintenance jobs, an important setting to remember is the default retention period. The default retention threshold for VACUUM is seven days. You can always increase the retention threshold for a Delta table(s) based on needs. Decreasing the retention threshold is not recommended. Even if you regularly run VACUUM on all tables, it will only remove data files that are eligible to be removed based on your tables retention settings. Setting a higher threshold gives you access to a greater history for your table, but increases the number of data files stored, incurring greater storage costs from your cloud provider. Therefore, it is always important to balance retention thresholds with your needs and your budget. VACUUM Warnings and Considerations Although VACUUM is designed to be a low-impact operation that can be performed without interrupting normal data operations since it physically removes old and unused data from storage, there are a few things to consider to avoid conflicts or even corrupt tables: It is not recommended that you set a retention interval shorter than seven days. It is possible that if you run VACUUM on a table with a short retention interval, files that are still active, such as uncommitted files that are needed by readers or writers, could be deleted. If VACUUM deletes files that haven’t been committed, it can cause read operations to fail or even corrupt your table. VACUUM | 141 Delta Lake has a safety check to prevent you from running a dangerous VACUUM command. If you are certain that there are no operations being performed on this table that take longer than the retention interval you plan to specify, you can turn off this safety check by setting the Spark configuration property spark. data bricks.delta.retentionDurationCheck.enabled = false. — If you do set spark.databricks.delta.retentionDurationCheck.enabled to false, you must choose an interval that is longer than the longest running concurrent transaction and the longest period that any stream can lag behind the most recent update to the table. — Do not disable spark.databricks.delta.retentionDurationCheck.enabled and run VACUUM configured to RETAIN 0 HOURS. — If you run VACUUM RETAIN num HOURS, then you must set RETAIN num HOURS to an interval greater than or equal to the retention period. Otherwise you will receive an error if spark.databricks.delta.reten tionDurationCheck.enabled - true. If you are certain that there are no operations being performed on this table, such as INSERT/UPDATE/ DELETE/OPTIMIZE, then you may turn off this check by setting spark.data bricks.delta.retentionDurationCheck.enabled = false to avoid the exception error. When you run VACUUM on a Delta table, it removes the following files from the underlying filesystem: — Any data files that are not maintained by Delta Lake, ignoring directories beginning with an underscore, like _delta_log. If you are storing additional metadata like Structured Streaming checkpoints within a Delta table directory, which you will learn more about in Chapter 8, use a directory name such as checkpoints. — Stale data files (files that are no longer referenced by a Delta table) that are older than the retention period. Since vacuuming removes files, it is important to note that the process can take some time, depending on the size of the table and the number of files to be removed. Run OPTIMIZE regularly to eliminate small files and reduce the number of files that need to be removed. When you combine OPTIMIZE with regular VACUUM runs, you ensure that the number of stale data files is minimized. The ability to time travel back to a version older than the retention period is lost after running VACUUM. — Set a deleted file retention duration equal to your log retention duration to maintain full compatibility of the entire history that you can time travel to. 142 | Chapter 6: Using Time Travel — This means that if you run VACUUM with the default settings, you will only be able to time travel seven days into the past from the time you run VACUUM. Depending on how many unused files need to be identified and removed, the VACUUM command can take a while to execute. To optimize the cost and perfor mance of your Spark cluster, it is recommended to use a cluster that auto-scales and configures based on the following steps that VACUUM follows to perform the operation: — Step 1 of the VACUUM operation identifies unused files using the worker nodes on the Spark cluster while the driver node sits idle. Therefore you should use 1 to 4 worker nodes with at least 8 cores each. — Step 2 of the VACUUM operation deletes the identified files using the driver node on the Spark cluster. Therefore you should use a driver node that has between 8 and 32 cores to avoid out-of-memory errors. Changing Data Feed So far in this chapter you have learned that through data and file retention, time travel enables you to traverse through different versions of data at specific points in time. But time travel does not track row-level changes, or rather, how row-level data is inserted, updated, or deleted across different versions. And Delta Lake offers more efficient ways to view these changes across different versions rather than just comparing entire versions of tables. This efficient tracking of row-level changes across versions is called the Change Data Feed (CDF). When enabled on a Delta table, the Delta Lake records “change events” for all the data written into the table. This includes the row data and metadata indicating whether the specified row was inserted, deleted, or updated. Downstream consumers can also read the change events in batch queries using SQL and DataFrame APIs, and in streaming queries with.readstream. You will read more about how streaming queries can consume the CDF in Chapter 9. With the CDF, you can capture changes to the data without having to process every single record in your Delta table file or query an entire version of a table. So, if just one record changed, you no longer have to read all records in the file or a table. The CDF is stored in a separate directory called _change_data that sits alongside _delta_log and maintains the changes to the Delta table file. The CDF supports several use cases that supplement Delta Lake time travel and versioning: ETL operations Identifying and processing only records that require row-level changes following operations can greatly accelerate and simplify ETL operations. Incrementally loading records during ETL operations is essential for any efficient ETL process. Changing Data Feed | 143 For example, if you have a large, denormalized table that contains all sales order information used for reporting and is created by joining from several upstream tables, you want to avoid processing all records each time the table processes. With the CDF, you can track the row-level changes from the upstream tables to determine what information, or sales order records, are new, updated, or deleted, and subsequently use that to incrementally process your table containing sales order information. Transmit changes for downstream consumers Other downstream systems and consumers, such as Spark Structured Streaming or Kafka, can consume the CDF to process data. For example, streaming queries, which you will learn more about in Chapter 8, can read the change feed to stream data for near-real-time analytics and reporting. If you have an event-driven application, an event-streaming platform such as Kafka could read the change feed and trigger actions for a downstream applica tion or platform. For instance, if you have an ecommerce platform, Kafka could read the change feed and trigger near-real-time actions in the platform based on product inventory changes that were captured in the Delta table. Audit trail table The CDF provides enhanced efficiency, especially compared to time travel, with querying changes to row-level data over time so that you can easily see data that was updated or deleted and when. This provides a full audit trail of your data. Many regulatory requirements may require certain industries to track these row level changes and keep an entire audit trial. In healthcare, for example, HIPAA and audit controls require systems to track activity, or changes, around electronic protected health information (ePHI).2 The CDF in Delta Lake helps support regulatory requirements for tracking changes. Enabling the CDF We can enable the CDF for all new tables by setting this Spark configuration property as follows: %sql set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true If you don’t wish to enable the CDF for all tables in your environment, you can specify it using table properties when you create a table or alter existing ones. At the start of this chapter when we executed the “Chapter Initialization” notebook, an external Delta table containing aggregate information about passenger counts and 2 “45 CFR § 164.312 - Technical Safeguards.” Cornell Law School. January 25, 2013. https://oreil.ly/qWFDe. 144 | Chapter 6: Using Time Travel fare amounts for vendors, or taxis, was created. In the following example that can be found in the notebook for “02 - Change Data Feed,”3 we create a new table and enable the CDF: %sql --create new table with change data feed CREATE TABLE IF NOT EXISTS taxidb.tripAggregates (Vendorld INT, PassengerCount INT, FareAmount INT) TBLPROPERTIES (delta.enableChangeDataFeed = true); --alter existing table to enable change data feed ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true); K Only changes made after you enable the CDF will be recorded, so any changes made to a table prior to enabling the CDF will not be captured. Modifying Records in the CDF To demonstrate the CDF, first let’s INSERT, UPDATE, and DELETE some data in the taxidb. tripAggregates table we just created so that we can view the CDF later on: %sql --insert record in the table INSERT INTO taxidb.tripAggregates VALUES (4, 500, 1000); --update records in the table UPDATE taxidb.tripAggregates SET TotalAmount = 2500 WHERE Vendorld = 1; -- delete record in the table DELETE FROM taxidb.tripAggregates WHERE Vendorld = 3; Now that there have been changes to the table, the CDF has captured the row-level changes. If we look at the location where the Delta table is stored, we will notice the new _change_data directory: %sh Is -al /dbfs/mnt/datalake/book/chapter06/TripAggregatesDelta/ Output (only relevant portions shown): drwxrwxrwx 2 root _change_data drwxrwxrwx 2 root _delta_log -rwxrwxrwx 1 root part-00000-...-c000.snappy.parquet 3 GitHub repo location: /chapter06/02 - Change Data Feed Changing Data Feed | 145 -rwxrwxrwx 1 root part-00000-.... snappy.parquet -rwxrwxrwx 1 root part-00000-...-C000.snappy.parquet -rwxrwxrwx 1 root part-00001-.... snappy.parquet Now that we can see the new _change_data directory, we can look in the directory to see the data files that contain the data changes: %sh Is -al /dbfs/mnt/datalake/book/chapter06/TripAggregatesDelta/_change_data Output: -rwxrwxrwx 1 root cdc-00000-.... snappy.parquet -rwxrwxrwx 1 root cdc-00001-....snappy.parquet The _change_data directory is another metadata directory containing the change capture data contained in the data files. Every time you make a change to the data going forward, you will not only update the current versions data files, but also the files in the _change_data directory. It is important to note that the CDF directory will only store updates and deletes in the _change_data directory, whereas for inserts it is more efficient to compute the CDF directly from the transaction log. This does not mean that inserts are not captured in the CDF; they are simply not stored in the _change_data directory. The data files in the _change_data directory adhere to the same retention policy of the table. This means that CDF data that is outside of the tables retention policy be deleted when you run VACUUM. Viewing the CDF To help identify the row-level changes that occurred, the CDF contains additional metadata to label the type of event and commit information. The following table shows the schema of the additional metadata columns. Table 6-2. CDF metadata I Column name Type Values I _change_type String insert, update_preimage, update_postimage, delete -Commit-Version Long The table version containing the change _commit_timestamp Timestamp Timestamp of when the commit occurred Using the additional metadata columns from the CDF, we can easily view the row level changes of a table. To view these changes and the CDF metadata columns, we can use the TABLE_CHANGES(tabfe_str, start [, end]) SQL command. The following table details the arguments for this command. 146 | Chapter 6: Using Time Travel Table 6-3. TABLE_CHANGES Arguments I Argument Type Definition I table_str String Represents the optionally qualified name of the table. start BIGINT or Timestamp The first version or timestamp of change to return. end BIGINT or Timestamp An optional argument for the last version or timestamp of change to return. If not specified, all changes from the start up to the current change are returned. Now circling back to the taxidb.tripAggregates table, there have been several DML operations to INSERT, UPDATE, and DELETE data on the existing table. You can indicate a version or timestamp, similar to time travel, to view the table changes using the TABLE_CHANGES() SQL command: %sql SELECT * FROM table_changes('taxidb.tripAggregates', 1, 4) ORDER BY _commit_timestamp Output: - +...............................- PassengerCount 1 FareAmount 1 _change_type | _commit_version 1000 1 2000 1 update_preimage 1 2 1000 1 2500 1 update_postimage 1 2 7000 1 10000 1 delete 1 3 500 1 1000 1 insert 1 4 +...................+...............................................+ | Vendorld | _commit_timestamp | + +............................................... + | 1 | 2023-07-09T19:17:54 | + +............................................... + | 1 | 2023-07-09T19:17:54 | + +............................................... + | 3 | 2023-07-09T19:17:54 | + +............................................... + | 4 | 2023-07-09T19:17:54 | + +............................................... + When looking at the row-level changes in this example, you can see the versions that correspond to when a particular record was inserted, updated, or deleted by looking at the _commit_version. The _change_type indicates the type of operation on the record, and for updated records notice that it indicates the row-level data before the update, as indicated by update_preimage, and the row-level data after the update, as indicated by update_postimage. Changing Data Feed | 147 The same table changes can be viewed using the DataFrame API as well by using the.option() method and setting "readChangeFeed" to "true": %python # view CDF table changes using versions spark.read.format("delta") \.option("readChangeFeed" , "true") \.option("startingVersion", 1) \.option("endingVersion", 4) \.table("taxidb.tripAgg regates") # view CDF table changes using timestamps spark.read.format("delta")\.option("readChangeFeed", "true")\.option("startingTimestamp", "2023-01-01 00:00:00")\.option("endingTimestamp" , "2023-01-31 00:00:00")\.table("taxidb.tripAggregates") Now, if we want to see the audit trail of a record and see how it has changed over time, we can simply use the CDF and TABLE_CHANGES() to capture this effi ciently For example, if we wanted to see how the values of a specific vendor in taxidb. tripAgg regates have changed over time, lets say WHERE Vendorld = 1, we could use the following query: %sql SELECT * FROM table_changes('taxidb.tripAggregates', 1, 4) WHERE Vendorld = 1 AND _change_type = 'update_postimage' ORDER BY _commit_timestamp Output: + - +.................................. +................................. FareAmount 1 TotalAmount | _change_type | -Commit-Version 10000 1 25000 | update_postimage 1 2 -+.................................. +----------------- -+----------------------------- Vendorld | _commi.t_timestamp 1 | 2023-07-09T19:17:54.000+000 +----------------------------- This provides an audit trail of how data for a particular vendor has been updated over time. While this is a simple example, you can see that this can be extremely powerful and much more efficient than time travel for large tables with many values that are consistently updated. Or, lets say we want to perform a time-series analysis and see how many new vendors have been added (assuming the granularity of this table is Vendorld) and what their 148 | Chapter 6: Using Time Travel FareAmount generated has been since a particular point in time. We can use a WHERE clause to specify this information and efficiently read the CDF: %sql SELECT * FROM table_changes('taxidb.tripAggregates', '2023-01-01') WHERE Vendorld = 1 AND _change_type = 'insert' ORDER BY _commit_timestanp Output: - +--------- ----------- - +-------------------------. _ --------------------------- FareAmount | Total-Amount | _change_type | _commit_version 500 1 1000 | insert 1 4 - +....................... - +--------------------- — - - +................................ -+- Vendorld 1 _commit_timestamp 4 1 2023-07-09T19:17:54.000+000 This demonstrates that you can see the table changes, specifically WHERE Vendorld = 1 AND _change_type = ' insertsince a commit timestamp, which in this case is 2023-01-01. The CDF is an efficient, powerful feature that can capture changes to data over time. This can be used with other ETL operations to easily build type 2 slowly changing dimensions, or you can process only row-level changes following MERGE, UPDATE, or DELETE operations to accelerate ETL operations and incrementally load data downstream. As mentioned previously, there are other use cases for the CDF, one of which is streaming, which you will learn more about in Chapter 9. CDF Warnings and Considerations While the CDF is a powerful feature, there are some things to consider: Change records follow the same retention policy as the data files of the table. This means that if a CDF file is outside the tables retention policy, it is a candidate for VACUUM and will be deleted if VACUUM is run. The CDF does not add any significant overhead to table processing, as everything is generated inline as the DML operations. And typically the data files written in the _change_data directory are much smaller in size compared to the total size of rewritten files of a table operation since they only contain operations for records that were updated or deleted. As mentioned previously, records inserted into a table are not captured in the _change_data directory. Changing Data Feed | 149 Since the change data happens inline with other operations, the change data is available as the new data is committed and available in the table. The CDF does not record changes to records that occurred prior to the CDF being enabled. Once you enable the CDF for a table, you can no longer write to the table using Delta Lake 1.2.1 or below, but you can still read the table. Condusion In this chapter, you read about how Delta Lake uses version control and how that enables you to traverse through different versions of data at specific points in time, while also using the CDF to track row-level changes to data over time. Time travel and the CDF in Delta Lake are powerful features that allow users to track changes over time and can be leveraged for enabling downstream consumers, ETL operations, time-series analysis, version control, auditing, and data management. After reading about how you can easily restore or query previous versions of a table, you learned that you can use either version numbers or timestamps to roll back, audit, and satisfy a variety of use cases. By using commands like DESCRIBE HISTORY, you can easily view a table’s commit history. This is all made possible through the transaction log and file retention. You can further define these retention settings, if you wish to change the default settings, for both log and data files using Spark config uration or table properties. Then, since data files are not automatically removed, you can remove old data files using the VACUUM command. To supplement time travel, Delta Lake also offers the Change Data Feed (CDF) for tables. This feature allows you to capture row-level changes in an efficient manner for numerous different use cases, rather than needing to compare entire versions of each tables history to identify changes. Using built-in Delta Lake features for time travel, retention, maintenance, and the CDF, you can save valuable time and resources, and satisfy regulatory and audit requirements. These features are made possible through the _change_data directory and more importantly the transaction log, and in the next chapter you will learn more about how the transaction log also stores a table’s schema and uses that to update and modify table schemas. 150 | Chapter 6: Using Time Travel