(Delta) Ch 8 Operations on Streaming Data.pdf
Document Details
Uploaded by EnrapturedElf
Tags
Full Transcript
CHAPTER 8 Operations on Streaming Data Spark Structured Streaming was first introduced in Apache Spark 2.0. The main goal of Structured Streaming was to build near-real-time streaming applications on S...
CHAPTER 8 Operations on Streaming Data Spark Structured Streaming was first introduced in Apache Spark 2.0. The main goal of Structured Streaming was to build near-real-time streaming applications on Spark. Structured Streaming replaced an older, lower-level API called DStreams (Discretized Streams), which was based upon the old Spark RDD model. Since then, Structured Streaming has added many optimizations and connectors, including integration with Delta Lake. Delta Lake is integrated with Spark Structured Streaming through its two major operators: readstream and writeStream. Delta tables can be used as both streaming sources and streaming sinks. Delta Lake overcomes many limitations typically associ ated with streaming systems, including: Coalescing small files produced by low-latency ingestion Maintaining “exactly-once” processing with more than one stream (or concurrent batch jobs) Leveraging the Delta transaction log for efficient discovery of which files are new when using files for a source stream We will start this chapter with a quick review of Spark Structured Streaming, followed by an initial overview of Delta Lake streaming and its unique capabilities. Next, we will walk through a small “Hello Streaming World!” Delta Lake streaming example. While limited in scope, this example will provide an opportunity to understand the details of the Delta Lake streaming programming model in a very simple context. Incremental processing of data has become a popular ETL model. The AvailableNow stream triggering mode enables developers to build incremental pipelines without needing to maintain their own state variables, resulting in simpler and more robust pipelines. 183 You can enable a Change Data Feed (CDF) on a Delta table. Clients can consume this CDF feed with SQL queries, or they can stream these changes into their application, enabling use cases such as creating audit trials, streaming analytics, compliance analysis, etc. Streaming Overview Although this chapter is specific to the Delta Lake streaming model, let’s briefly review the basics of Spark Structured Streaming before delving into the unique capabilities of Delta Lake Structured Streaming. Spark Structured Streaming Spark Structured Streaming is a near-real-time stream processing engine built on top of Apache Spark. It enables scalable, fault-tolerant, and low-latency processing of continuous data streams. Spark Streaming provides a high-level API, allowing you to build end-to-end streaming applications that can read and write data from and to a variety of sources, such as Kafka, Azure Event Hubs, Amazon S3, Google Cloud Platform’s Pub/Sub, the Hadoop Distributed File System, and many more. The core idea behind Structured Streaming is that it allows you to treat a data stream as a boundless table-like structure that you can query and manipulate by using SQL-like operations, making it easy to analyze and manipulate the data. One of the many benefits of Spark Structured Streaming is its ease of use and simplicity. The API is built on top of the familiar Spark SQL syntax, so you can leverage your existing knowledge of SQL and DataFrame operations to build streaming applications without learning a new set of complex APIs. Additionally, Structured Streaming provides fault-tolerance and reliability by leverag ing Spark’s processing engine, which can recover from failures and ensure that each data point is processed exactly once. This type of fault tolerance makes it ideal for building mission-critical applications that require low-latency and high-throughput data processing. Delta Lake and Structured Streaming When you leverage Delta Lake with Structured Streaming, you get both the transac tional guarantees of Delta Lake and the powerful programming model of Apache Spark Structured Streaming. With Delta Lake, you can now use Delta tables as both streaming sources and sinks, enabling a continuous processing model that processes your data through the Raw, Bronze, Silver, and Gold data lake layers in a streaming fashion, eliminating the need for batch jobs, resulting in a simplified solution archi tecture. In a later part of this chapter, we’ll present an example of such a continuous processing architecture. 184 | Chapter 8: Operations on Streaming Data In Chapter 7 we discussed schema enforcement and schema evolution. Streaming into Delta Lake offers schema enforcement, which ensures that incoming data streams are validated against the predefined schema, preventing data anomalies from entering the data lake. However, when changing business requirements introduce the need to capture additional information, you can leverage Delta Lake’s schema evolution capabilities to allow the schema to change over time. Streaming Examples We will start this section by reviewing a very simple “Hello Streaming World!” example illustrating the basics of streaming from and to a Delta table. Hello Streaming World In this section we will create a simple Delta table streaming scenario and set up a streaming query that: Reads all changes from a source Delta table into a streaming DataFrame. In the case of Delta Lake tables, “reading the changes” equates to “reading the transaction log entries,” since they contain the details of all changes to the table. Performs some simple processing on the streaming DataFrame. Writes the streaming DataFrame to a target Delta table. The combination of reading a stream from a source and writing the stream to a target is often referred to as a streaming query, as illustrated in Figure 8-1. Once we have the streaming query up and running, we will perform a number of small batch updates on the source table, allowing the data to flow through the streaming query to the target. During the execution of the query, we will query the query process log, and study the contents of the checkpoint files, which maintain the state of our streaming query. This simple example will allow you to fully understand the basics of the Delta Lake streaming model before moving on to more complex examples. First, execute the “Chapter Initialization” notebook for Chapter 81 to create the required Delta tables. Next, open the “01 - Simple Streaming” notebook. 1 GitHub repo location: /chapter08/00 - Chapter Initialization Streaming Examples | 185 Transaction log Version Timestamp 5 2023-04-13T20:54:41.000+0000 4 2023-04-13T20:51:06.000+0000 3 2O23-O4-13T2O:5O:57.OOO+OOOO 2 2023-04-13T20:48:50.000+0000 1 2023-04-13T20:44:36.000+0000 0 2023-04-13T20:42:42.000+0000 Streaming query r---------------------------------- i i !.readstream.writestream I.format("delta").format("delta") I.load(path).start(path)........ Streaming source taxidb.limitedYellowTaxis (Delta table) 4 1 1 1 1 1 ▼ State and metadata Batch updates management taxidb.allYellowTaxis (Delta file) /chapter08/ StreamingTarget/.checkpoint (checkpoint file) Figure 8-1. Basic streaming example Here we have a Delta table that contains 10 records of yellow taxi data, all contained in a single Parquet file: %sh Is -al /dbfs/mnt/datalake/book/chapter08/LimitedRecords.delta drwxrwxrwx 2 root root 4096 Apr 11 19:40 _delta_log -rwxrwxrwx 1 root root 6198 Apr 12 00:04 part-00000-....snappy.parquet %sql SELECT * from delta.'/mnt/datalake/book/chapter08/LimitedRecords.delta' 186 | Chapter 8: Operations on Streaming Data Output (only showing relevant portions): +.......... +------------ + |Rideld|Vendorld| PtckupTime | DropTime I +.......... + + + + H I 1 |2022-03-01T00:00:00.000+0000|2022-03-01T00:15:34.000+0000| 12 | 1 |2022-03-01T00:00:00.000+0000|2022 -03-01T00:10:56.000+00001 13 | 1 |2022- 03 -01T00:00:00.000+0000|2022 - 03-01T00:11:20.000+00001 |4 | 2 |2022-03-01T00:00:00.000+0000|2022-03-01T00:20:01.000+0000| |5 | 2 |2022- 03 -01T00:00:00.000+0000|2022 - 03-01T00:00:00.000+00001 |6 | 2 |2022 - 03-01T00:00:00.000+0000|2022 - 03-01T00:00:00.000+00001 |7 | 2 |2022 - 03 -01T00:00:00.000+0000|2022 - 03-01T00:00:00.000+00001 |8 | 2 |2022- 03-01T00:00:00.000+0000|2022 - 03-01T00:00:00.000+00001 |9 | 2 |2022- 03-01T00:00:00.000+0000|2022 - 03 -01T00:00:00.000+00001 110 I 2 | 2022-03-01T00:00:01.000+0000|2022-03-01T00:11:15.000+00001 +.......... + + + + Creating the streaming query First, we are going to create our first simple streaming query. We start by reading a stream from the source table, as follows: # Start streaming from our source "LimitedRecords" table # Notice that instead of a "read", we now use a "readstream", # for the rest our statement is just like any other spark Delta read stream_df = \ spark \.readstream \.format("delta") \.load("/mnt/datalake/book/chapter08/LimitedRecords.delta") The readstream is just like any other standard Delta table read except for the Stream suffix. We get back a streaming DataFrame in stream_df. K A streaming DataFrame is very similar to a standard Spark Data Frame, so you can use the Spark API with all the methods you already know. However, there are a few differences that you need to be aware of. First, a streaming DataFrame is a continuous, unboun ded sequence of data where each piece of data is treated as a new row in the DataFrame. Since a streaming DataFrame is unbounded, you cannot perform a count() or a sort() operation on it. Next, we perform some manipulations on our DataFrame. We add a timestamp, so we know when we read each record from our source table. We also don’t need all columns in the source DataFrame, so we select the columns that we need: # Add a "RecordStreamTime" column with the timestamp at which we read the # record from stream stream_df = stream_df.withColumn("RecordStreamTime", current_timestamp()) Streaming Examples | 187 # This is the list of columns that we want from our streaming # DataFrame select_columns = [ 'Rideld', 'Vendorld', 'PickupTime', 'DropTime', 'PickupLocationld', 'DropLocationld', 1PassengerCount', 'TripDistance', 1TotalAmount1, 'RecordStreamTime' # Select the columns we need. Note that we can manipulate our stream # just like any other Datastream, although some operations like # count() are NOT supported, since this is an unbounded DataFrame stream_df = stream_df.select(select_columns) Finally, we write the DataFrame to an output table: # Define the output location and the checkpoint location target-location = "/mnt/datalake/book/chapter08/StreamingTarget" target_checkpoint_location = f"{target_location}/_checkpoint" # Write the stream to the output location, maintain # state in the checkpoint location streamQuery = \ First, we define a target, or output location, where we want to write the stream. In the option, we define a checkpoint file location. This checkpoint file will maintain the metadata and state of the streaming query. The checkpoint file is necessary to ensure fault tolerance and enable the query’s recovery in case of failure. Among many other pieces of information, it will maintain which transaction log entries of the streaming source were already processed, so it can identify the new entries that have not yet been processed. Finally, we invoke the start method with the target location. Notice that we are using the same base directory for both the output and the checkpoint file. We just append the underscore (-Checkpoint) for the checkpoint subdirectory. Since we have not specified a trigger, the streaming query will continue to run, so it will execute, check for new records, process them, and then immediately check for the next set of records. In the following sections, you will see that you can change this behavior with a trigger. The query process log When we start the streaming query we see the stream initializing, and a query progress log (QPL) is displayed. The QPL is a JSON log generated by every single micro-batch, and provides execution details on the micro-batch. It is used to display 188 | Chapter 8: Operations on Streaming Data a small streaming dashboard in the notebook cell. The dashboard provides various metrics, statistics, and insights about the stream applications performance, through put, and latency. When you expand the stream display, you see a dashboard with two tabs (Figure 8-2). Dashboard Raw Data Aug 15 Command complete Dashboard Raw Data { "id" : "450bc809-6cac-4eaf-8f69-752faf03c32e", "runld" : "15637c02-e947-4ad2-b43d-98d28aa5ec71", "name" : null, "timestamp" : "2023-08-15T15:32:17.50OZ", "batchld" : 1, "numlnputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { : 14, "latestoffset" "trip-?erExecution" : 14 Command complete Figure 8-2. The query progress log display Streaming Examples | 189 The first tab contains the dashboard, where some of the key metrics from the QPL are displayed graphically. The raw metrics are displayed in the Raw Data tab. A portion of the raw data of the query process log is shown here: { "id" : "c5eaca75-cf4d-410f-b34c-9a2128eel944" , "runld" : "508283a5-9758-4cf9-8ab5-3ee71a465b67", "name" : null, "timestamp" : "2023-05-30T16:31:48.500Z", "batchld" : 1, "numlnputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "latestOffset" : 14, "triggerExecution" : 15 }, A key metric in the QPL is the stream unique id, the first entry in the log. This ID uniquely identifies the stream and maps back to the checkpoint directory, as you will see later. The stream unique id is also displayed above the streaming dashboard header. The query log also contains the batchld, which is the micro-batch ID. For every stream, this ID will start with zero and increment by one for every processed micro batch. The numlnputRows field represents the number of rows that were ingested in the current micro-batch. The next set of important metrics in the QPL are the Delta source and sink metrics: The sources startoffset and endoffset indicate where each batch started and ended. These include the following subfields: — The reservoirVersion is the version of the Delta table on which the current micro-batch is operating. — The index is used to keep track of which part file to start processing from. — The isStartingVersion boolean field is set to true if the reservoirVersion is set to the version of the Delta table at which the current stream was started. The sink field contains the location of the streaming sink. When we look at the source and sink metrics for micro-batch 1, we see the following: "sources" : [ { "description" : "DeltaSource[dbfs:/mnt/.../LimitedRecords.delta]", "startoffset" : { "sourceversion" : 1, "reservoirld" : "6c25c8cd-88cl-4b74-9c96-a61cl727c3a2", "reservoirVersion" : 0, 190 | Chapter 8: Operations on Streaming Data "index" : 0, "isStartingVersion" : true }, "endoffset" : { "sourceversion" : 1, "reservoirld" : "6c25c8cd-88cl-4b74-9c96-a61cl727c3a2", "reservoirVersion" : 0, "index" : 0, "isStartingVersion" : true }, "latestOffset" : null, "numlnputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "metrics" : { "numBytesOutstanding" : "0", "numFilesOutstanding" : "0" } } ], "sink" : { "description" : "DeltaSink[/mnt/datalake/book/chapter08/StreamingTarget]", "numOutputRows" : -1 } Notice numlnputRows is 0. This might look a bit surprising, since we know that our source table had 10 rows in it. However, when we started the.writeStream, the streaming query started running and immediately processed the first 10 rows as part of batch 0. We can also see that our batchld is currently 1, and since batchlds start with 0, the first batch was already processed. We can also see that the reservoirVersion is still 0 since this batch has not yet run, as no new records were processed. So, we are still at version 0 of our source table. We also see that the index is at 0, which means that we are processing the first data file, and we are indeed at the start version. We can verify this by displaying the version of the source table: %sql DESCRIBE HISTORY delta.'/mnt/datalake/book/chapter08/LimitedRecords.delta' Output (only showing relevant portions): + +.................................................... + |version| timestamp | + -+.......................................... + 10 |2023-05 -30T16:25:23.000+00001 + +.................................................... + Here you can see that we are indeed at version 0 at this time. We can also verify this by querying our output streaming table: %sql SELECT count(*) FROM delta.'/nnt/datalake/book/chapter08/StreamingTarget' Streaming Examples | 191 We can see that we indeed have 10 rows: +...............+ |count(l)| +.............+ I 10 I +...............+ Because we started the writeStream with.start, and without any indication of how often the query should run, it is running constantly. When the writeStream completes, it performs the next readstream, and so on. However, since no new rows are being produced in the source table, nothing really happens, and our output record count remains at 10. The batchld will not change until it picks up rows from the stream, so it remains at 1. Next, we execute the following SQL statement that inserts 10 new records in the source table: %sql -- Use this query to insert 10 random records from the -- allYellowTaxis table into the limitedYellowTaxis table INSERT INTO taxidb.limitedYellowTaxis SELECT * FROM taxidb.allYellowTaxis ORDER BY rand() LIMIT 10 If we uncomment and run this query, the batchld is now set at 1, and we see the 10 new rows: { "id" : "cSeaca7S-cf4d-410f-b34c-9a2128eel944", "runld" : "508283a5-9758-4cf9-8ab5-3ee71a465b67", "name" : null, "timestamp" : "2023-05-30T16:46:08.500Z", "batchld" : 1, "numlnputRows" : 10, "inputRowsPerSecond" : 20.04008016032064, Once this batchld is processed, the record count for batchld 2 will be back to 0, since no new rows are arriving from the stream. Remember, the streaming query will keep running forever, looking for new transac tion entries in the source table and writing the corresponding rows to the streaming target. Typically, Spark Structured Streaming is running as a micro-batch-based streaming service. It will read a batch of records from the source, process the records and write them to the target, and immediately afterward it will start the next batch 192 | Chapter 8: Operations on Streaming Data looking for new records (or, in the case of Delta Lake, looking for new transaction entries). This model, where we are doing batch updates to the source table, would not be economical in a real-world application. The source table is only periodically updated, but since our streaming query runs constantly, we have to keep its cluster running all the time, which runs up costs. Later in this chapter we will modify the streaming query to better fit our use case, but first, let’s take a brief look at the checkpoint file. The checkpoint file Earlier, we saw that the checkpoint file will maintain the metadata and state of our streaming query. The checkpoint file is in the checkpoint subdirectory: %sh Is -al /dbfs/mnt/datalake/book/chapter08/StreamingTarget/_checkpoint/ drwxrwxrwx 2 root root 4096 May 1 23:37 tmp_path_dir drwxrwxrwx 2 root root 4096 May 1 23:37 commits -rwxrwxrwx 1 root root 45 May 2 15:48 metadata drwxrwxrwx 2 root root 4096 May 1 23:37 offsets We have one file {metadata), and two directories {offsets and commits). Lets take a look at each one. The metadata file simply contains the stream identifier in JSON format: %sh head /dbfs/mnt/datalake/book/chapter08/StreamingTarget/_checkpoint/metadata {"id":"c5eaca75-cf4d-410f-b34c-9a2128eel944"} When we look at the offsets directory, you see two files, one for each batch Id: %sh Is -al /dbfs/mnt/datalake/book/chapter08/StreamingTarget/_checkpoint/offsets -rwxrwxrwx 1 root root 769 May 30 16:28 0 -rwxrwxrwx 1 root root 771 May 30 16:46 1 When we look at the contents of file 0, we see the following: "batchWatermarkMs": 0, "batchTimestampMs": 1685464087937, "conf": { "spark.sql.streaming.stateStore.providerclass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider", } } { Streaming Examples | 193 "sourceVersion": 1, "reservoirld": "6c25c8cd-88cl-4b74-9c96-a61cl727c3a2", "reservoirVersion": 0, "index": 0, "isStartingVersion": true } The first section contains the Spark streaming configuration variables. The second section contains the same reservoirVersion, index, and isStartingVersion we saw in the QPL earlier. What is logged here is the state before the batch was executed, so we are at version zero, the file index is zero, and the isStartingVersion variable indicates that we are at the starting version. When we look at file 1, we see the following: vl { "batchWatermarkMs": 0, "batchTimestampMs": 1685465168696, "conf": { "spark.sql.streaming.stateStore.providerclass": "org.apache.spark.sql.execution.streaming.state. HDFSBackedStateStoreProvider", } } { "sourceVersion": 1, "reservoirld": "6c25c8cd-88cl-4b74-9c96-a61cl727c3a2", "reservoirVersion": 2, "index": -1, "isStartingVersion": false } In this batch, the 10 additional records were processed, and the next possible version that will be processed is 2, which is reflected in the reservoirVersion. Also, notice that the index is set to -1, which indicates that there are no additional files to be processed for the current version. The commits folder contains one file per micro-batch. In our case, we will have two commits, one for each batch: drwxrwxrw -rwxrwxrwx 1 root root 29 Jun 9 15:55 0 -rwxrwxrwx 1 root root 29 Jun 9 16:15 1 Each file represents the successful completion of the micro-batch. It simply contains a watermark: %sh head /dbfs/nrnt/datalake/book/chapter08/StreamingTarget/_checkpoint/connits/0 194 | Chapter 8: Operations on Streaming Data This produces: vl {"nextBatchWatermarkMs":0} In this section, we had our first look at Delta streaming. We looked at a simple example, with a Delta table as both the source and the sink of the streaming query. In the following sections, we will look at how we can leverage Delta streaming in an incremental processing model. AvailableNow Streaming Spark Structured Streaming provides a number of possible trigger modes. The AvailableNow trigger option consumes all available records as an incremental batch with the ability to configure batch sizes with options such as maxBytesPerTrigger. First, we need to cancel our currently running streaming query in the “02 - Simple Streaming” notebook by navigating to the streaming dashboard and clicking the cancel link. We can then confirm the cancellation and stop the streaming query. Since the source table is only periodically updated, we don’t want the streaming query to run continuously. Instead, we want to start the query, pick up the new transaction entries, process the corresponding records, write to the sink, and then stop. This is what the following trigger will allow us to do. If we add the code. trig ger(availableNow=True) to the streaming query, the query will run once and then stop, as shown in notebook “02 - AvailableNow Streaming”: # Write the stream to the output location, maintain # state in the checkpoint location streamQuery = \ stream_df \.writeStream \ ,format("delta") \.option("checkpointLocation", target_checkpoint_location) \.trigger(availableNow=True) \.start(target_location) When we run this notebook, the streaming query will run until no new records are found, but since no new records have been added to the source table, no records are found, and no records are written to the target table. We can verify this by looking at the raw data of the writeStream: { "id" : "c5eaca75-cf4d-410f-b34c-9a2128ee!944", "numlnputRows" : 0, If we now run the SQL query below the writeStream in the notebook, we will add 10 records to the source table. If we then rerun the streaming query, we will again see the 10 new rows: Streaming Examples | 195 { "id" : "c5eaca75-cf4d-410f-b34c-9a2128eel944" , "runld" : "36a31550-c2cl-48b0-9a6f-cell2572f59d", "name" : null, "timestamp" : "2023-05-30T17:48:12.079Z", "batchld" : 2, "numlnputRows" : 10, "sources" : [ { "description" : "DeltaSource[dbfs:/mnt/.../LimitedRecords.delta]", "startoffset" : { "sourceversion" : 1, "reservoirld" : "6c25c8cd-88cl-4b74-9c96-a61cl727c3a2", "reservoirVersion" : 3, "index" : -1, "isStartingVersion" : false In the output, we also see the sources section, with the reservoirVersion variable, which is currently set to 3. Remember that the reservoirVersion represents the next possible version ID in this case. If we do a DESCRIBE HISTORY of our table, we can see that we are at version 2, so the next version would be 3: %sql describe history delta.'/mnt/datalake/book/chapter08/LimitedRecords.delta' Output (only version column shown): +............ + |version| +............+ I 2 | I 1 I I 0 I +--------- + In the next query, we add 20 more records to the source table. If we then rerun our streaming query and look at the raw data, we see the 20 new records, and also see that the reservoirVersion of the startoffset is now set at 3: I "id" : "d89a5c02-052b-436c-a372-2445fb8d88d6", "numlnputRows" : 20, "sources" : [ { "description" : "DeltaSource[dbfs:/mnt/.../LimitedRecords.delta]", "startoffset" : { "sourceversion" : 1, "reservoirld" : "31611029-07dl-4bcc-8ee3-cadOd4fa8bc4" , "reservoirVersion" : 3, }, 196 | Chapter 8: Operations on Streaming Data This AvailableNow model means that we could now run the streaming query as shown in the “02 - AvailableNow Streaming” notebook just once a day, or once an hour, or in whatever time interval the use case demands. Delta Lake will always pick up all changes that happened to the source table since the last run, thanks to the state saved in the checkpoint file. With legacy solutions, this type of incremental processing was very complex. As an ETL developer, you had to maintain the date of the last run and then query from a dedicated date of the source data to discover the new rows, etc. AvailableNow streaming greatly simplifies this programming model, since it abstracts out all of this complex logic. In addition to the AvailableNow trigger, there is also a RunOnce trigger, which behaves very similarly. Both triggers will process all available data. However, the RunOnce trigger will consume all records in a single batch, while the AvailableNow trigger will process the data in multiple batches when appropriate, typically resulting in better scalability. When you want to consume all available data with a streaming query, use the AvailableNow trigger, since it provides better scala bility by executing multiple batches when needed. Updating the Source Records Next, lets take a look at what happens when we run an update like the following statement: %sql -- Update query to demonstrate streaming update -- behavior UPDATE taxidb.limitedyellowtaxis SET PickupLocationld = 100 WHERE Vendorld = 2 When we look at the commitinfo action for the corresponding transaction log entry, we see the following: "commitinfo": { "operation": "UPDATE", Streaming Examples | 197 }, "notebook": { "notebookid": "3478336043398159" }, "operationMetrics": { "numCopiedRows": "23", "numUpdatedRows": "27", } } We can see that 23 rows were copied to new data files and 27 rows were updated, for a total of 50 rows. So, if we run our query again, we should see exactly 50 rows in our batch. When we run the “02 - AvailableNow Streaming” notebook again, we will see 50 rows: { "batchld" : 4, "numlnputRows" : 50, If we go back and run the streaming query again, we will notice the following error: Stream stopped... com.databricks.sql.transaction.tahoe.DeltaUnsupportedOperationException: Detected a data update (for example part-00000-....snappy.parquet) in the source table at version 3. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. The source table can be found at path dbfs:/mnt/.../LimitedRecords.delta. Here, Delta Lake is informing us that data updates in the stream are not currently supported. If we know that we really only want new records, and not changes, we can add the.option("ignoreChanges", "True") option to the readstream: # Start streaming from our source "LimitedRecords" table # Notice that instead of a "read", we now use a "readstream", # for the rest our statement is just like any other spark Delta read # Uncomment the ignoreChanges option when you want to receive only # new records, and no updated records stream_df = \.load("/mnt/datalake/book/chapter08/LimitedRecords.delta") 198 | Chapter 8: Operations on Streaming Data If we now rerun the streaming query, it will succeed. However, when we look at the raw data, we still see all 50 input rows, which looks wrong: I "id" : "d89a5c02-052b-436c-a372-2445fb8d88d6", "runld" : "bl304246-4083-4275-8637-lf99768b8e03" , "name" : null, "timestamp" : "2023-04-13T17:28:31.380Z", "batchld" : 3, "numlnputRows" : 50, "inputRowsPerSecond" : 0.0 This behavior is normal. The ignoreChanges option will still emit all rewritten files in the Delta table to the stream. This is typically the superset of all changed records. However, only the inserted records will actually be processed. The StreamingQuery class Let’s look at the type of the streamQuery variable: # Let's take a look at the type # of the streamQuery variable print(type(streamQuery)) Output: We can see that the type is StreamingQuery. If we invoke the status property of our streamQuery, we get the following: # Print out the status of the last StreamingQuery print(streamQuery.status) Output: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False} The query is currently stopped and there is no data available. No trigger is active. Another interesting property is recentProgress, which will print out the same out put as the raw data section from our streaming output in the notebook. For example, if we want to see the number of input rows, we can print the following: print(streamQuery.recentProgress["numlnputRows" ]) Output: 50 This object also has some interesting methods. For example, if we want to wait until the stream terminates, we can use the awaitTermination() method. Streaming Examples | 199 Reprocessing all or part of the source records As we have been processing a number of batches from the source table, the check point file has systematically been building up all of these changes. If we delete the checkpoint file and run the streaming query again, it will start from the very beginning of the source table and bring in all records: %sh # Uncomment this line if you want to reset the checkpoint rm -r /dbfs/mnt/datalake/book/chapter08/StreamingTarget/_checkpoint Output of the streaming query: { "numlnputRows" : 50, 3 "stateOperators" : [ ], "sources" : [ { "description" : "DeltaSource[dbfs:/mnt/.../LimitedRecords.delta]", "startoffset" : null, "endoffset" : { "reservoirVersion" : 5, }, "latestoffset" : null, "numlnputRows" : 50, We read all rows in the source table. We started at offset null and ended at reservoir Version 5. We can also just stream in part of the changes. To do this, we can specify a starting Version in the readstream after we clear out the checkpoint again: stream_df = \ spark \.readstream \.option("ignoreChanges", True) \.option("startingVersion", 3) \.format("delta") \.load("/mnt/datalake/book/chapter08/LimitedRecords.delta") When we look at the raw data, we get the following result: { "batchld" : 0, "numlnputRows" : 70, "inputRowAnd" : 0.0, "stateOperators" : [ ], "sources" : [ { 200 | Chapter 8: Operations on Streaming Data "startoffset" : null, "endoffset" : { "sourceversion" : 1, "reservoirld" : "32c71d93-ca81-4d6e-9928-cla095183016", "reservoirVersion" : 6, "index" : -1, "isStartingVersion" : false }, We get 70 rows. That is incorrect because we started from version 3. Let’s take a look at Table 8-1, which summarizes the versions and operations we’ve worked with so far. Table 8-1. Record counts for each version Version Number of rows affected From operation 1 5 50 Update 4 10 Insert 3 10 Insert Total 70 This validates the total number of input rows for the streaming query. Setting the startingVersion gives us many options when we combine it with the DESCRIBE HISTORY command. We can look at the history and decide from what point in time we would like to load the data. Reading a Stream from the Change Data Feed In Chapter 6, you read about how Delta Lake records “change events” for all the data written into the table via the CDF. These changes can be transmitted to downstream consumers. These downstream consumers can read the change events captured and transmitted in the CDF using streaming queries with. readStreamQ. To get the changes from the CDF while reading a table with CDF enabled, set the option readChangeFeed to true. Setting readChangeFeed to true in conjunction with. readStream() will allow us to efficiently stream changes from a source table to a downstream target table. We can also use startingVersion or startingTimestamp to specify the starting point of the Delta table streaming source without processing the entire table: # Read CDF stream with readChangeFeed since version 5 spark.readstream \.format("delta") \.option("readChangeFeed", "true") \.option("startingVersion", 5) \. table("") # Read CDF stream since starting timestamp 2023-01-01 00:00:00 Streaming Examples | 201 spark.readstream \.format("delta") \.option("readChangeFeed", "true") \.option("startingTtmestamp", "2023-01-01 00:00:00") \. table("") Using.option("readChangeFeed", "true") will return table changes with the CDF schema that provides the _change_type, _commit_timestamp, and _commit_verston that the readstream will consume. Here is an example of the CDF data (this is from Chapter 6): + + + + + + | Vendorld | PassengerCount | FareAmount | _change_type | _commit_version | — +--- — + _. - -+ — 1 1 1000 1 2000 1 update_preimage 1 2 1 1 1000 1 2500 1 update_postimage 1 2 3 1 7000 1 10000 1 delete 1 3 4 1 500 1 1000 1 insert 1 4 - -+— — + _. - -+ +---- The previous code snippets for reading the change feed specified the starttngVersion or startingTimestamp. Its important to note that these methods are optional, and if not provided, the stream fetches the latest snapshot of the table at the time of streaming as an INSERT and future changes as change data. While initiating the streaming source from a specified version or timestamp is possible, the schema associated with the streaming source reflects the most recent schema of the Delta table. Its impor tant to ensure there are no incompatible schema changes to the Delta table following the specified version or timestamp. Failing to do so could result in inaccurate outcomes when the streaming source retrieves data with a schema that doesn’t match. When reading change data, there are other options that we can specify, specifically around data changes and rate limits (how much data is processed in each micro batch). Table 8-2 highlights additional, important options for use in streaming quer ies when using Delta tables as a stream source. 202 | Chapter 8: Operations on Streaming Data Table 8-2. Additional streaming options 1 Option Definition 1 maxFilesPer Controls how many new files are considered in every micro-batch. The default is 1,000. Trigger maxBytesPer Controls how much data gets processed in each micro-batch. If you use T rigger.Once, this option Trigger is ignored. This option is not set by default. ignoreDeletes Ignores transactions that delete data at partition boundaries. ignoreChanges Reprocesses updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. ignoreChanges also incorporates ignoreDeletes. Rate limit options can be useful for better control of overall resource management and utilization. For example, we may want to avoid potentially overloading process ing resources (e.g., our cluster) when there is an influx of new data files or a large volume of data to process. Controlling rate limits can help achieve a more balanced processing experience by controlling micro-batch size. If we want to effectively con trol rate limits, while also ignoring deletes to avoid disrupting the existing streaming query, we can specify these options in the streaming query: # Read CDF stream with readChangeFeed and don't specify the # starting timestamp or version. Specify rate limits and ignore deletes. spark.readstream \.format("delta") \.option("maxFilesPerTrigger" , 50) \.option("maxBytesPerTrigger", "10MB") \.option("ignoreDeletes" , "true") \.option("readChangeFeed", "true") \. table(" delto_table_nane'") In this example, we are setting rate limit options, ignoring deletes, and omitting the starting timestamp and version options. This will read the latest version of the table (since no version or timestamp is specified) and give us better control over the size of micro-batches and processing resources to reduce potential interruptions to the streaming query. Streaming Examples | 203 Conclusion One of Delta Lake’s key features is the unification of batch and streaming data into a single table. This chapter dove into the particulars and examples of how Delta Lake is fully integrated with Structured Streaming, and how Delta tables support scalable, fault-tolerant, and low-latency processing of continuous data streams. Integrated with Structured Streaming through readstream and writeStream, Delta tables can be used as both streaming sources and targets, and leverage streaming DataFrames. The examples in this chapter walked through reading changes into these streaming DataFrames and how to perform simple processing to write streams to a target. Then we explored checkpoint files and metadata, the query process log, and the streaming class to better understand how streaming works and keeps track of information under the hood. And finally, you learned how to leverage the CDF with readstream to transmit and read row-level changes in streaming queries. Having unified both batch and streaming data into a single Delta table, Chapter 9 will dive into how to securely share this data with other organizations. 204 | Chapter 8: Operations on Streaming Data