Pipelines with Databricks Delta Live Tables 2 PDF
Document Details
Uploaded by FlatterPegasus
Tags
Summary
This document discusses pipelines using Databricks Delta Live Tables, focusing on Change Data Capture (CDC) and SQL/Python examples. It explains the use of the APPLY CHANGES INTO statement and provides code snippets.
Full Transcript
Pipelines with Databrick Delta Live Tables 2 #deltalivestables #databricks #medallionarchitecture Change Data Capture (CDC) To maintain an up-to-date replica of a table stored elsewhere is possible using the APPLY CHANGES INTO statement. APPLY CHANGES INTO has the following guarantees and req...
Pipelines with Databrick Delta Live Tables 2 #deltalivestables #databricks #medallionarchitecture Change Data Capture (CDC) To maintain an up-to-date replica of a table stored elsewhere is possible using the APPLY CHANGES INTO statement. APPLY CHANGES INTO has the following guarantees and requirements: Performs incremental/streaming ingestion of CDC data Provides simple syntax to specify one or many fields as the primary key for a table Default assumption is that rows will contain inserts and updates Can optionally apply deletes Automatically orders late-arriving records using user-provided sequencing key Uses a simple syntax for specifying columns to ignore with the EXCEPT keyword Will default to applying changes as Type 1 SCD SQL APPLY CHANGES INTO LIVE.table_name FROM STREAM(live.another_table) KEYS (columns) SEQUENCE BY timstamp_column; the sequence part indicates the order in which changes are going to be applied. It can be: log sequence number (lsn) timestamp ingestion time A variety 3rd party tools can provide streaming change feed as: kafka Kinesis The code below: creates the customers_silver table; APPLY CHANGES INTO requires the target table to be declared in a separate statement identifies the customers_silver table as the target into which the changes will be applied specifies the table customers_bronze_clean as the streaming source identifies the customer_id as the primary key specifies that records where the operation field is DELETE should be applied as deletes specifies the timestamp field for ordering how operations should be applied indicates that all fields should be added to the target table except operation , source_file , and _rescued_data. SQL CREATE OR REFRESH STREAMING TABLE customers_silver; APPLY CHANGES INTO LIVE.customers_silver FROM STREAM(LIVE.customers_bronze_clean) KEYS (customer_id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY timestamp COLUMNS * EXCEPT (operation, source_file, _rescued_data) Automated Data Management DLT automatically optimizes data for performance & easy-of-use. Best practices: DLT encodes Delta best practices automatically when creating DLT setting the following properties: optimizeWrite autoCompact tuneFileSizesForRewrites Physical data: DLT automatically manages your physical data to minimize cost and optimize performance by: running vaccum daily runs optimize daily Note: One still can tell how you want it organized (i.e. ZORDER) Scheme evolution: schema evolution is handled for you by: modifying live table transformation to add/remove/rename a column will automatically do the right thing when removing a column in the streaming live table, old values are preserved Important: DLT syntax is not intended for interactive execution in notebook. For proper execution, notebook using DLTs need to be scheduled as part of a pipeline. DLT Example In this example a new orders_silver table is created from a orders_bronze table (from the LIVE schema) adding TBLPROPERTIES and validating the column order_timestamp with an expectation failing the update if the condition is violated: SQL CREATE OR REFRESH STREAMING LIVE TABLE orders_silver (CONSTRAINT valid_date EXPECT (order_timestamp > "2021-01-01") ON VIOLATION FAIL UPDATE) TBLPROPERTIES ("quality" = "silver") AS SELECT timestamp(order_timestamp) as order_timestamp , * EXCEPT (order_timestamp, source_file, _rescued_data) FROM STREAM(LIVE.orders_bronze) ; Another example is to take the previous silver table a create a live DLT to aggregate the data: SQL CREATE OR REFRESH LIVE TABLE orders_by_date AS SELECT date(order_timestamp) as order_date ,count(*) as total_daily_orders FROM LIVE.orders_silver GROUP BY date(order_timestamp) This time, the streaming option is not used because this is a snapshot of the data. Python vs. SQL Python SQL Notes Python API Proprietary SQL API No syntax check Has syntax checks In Python, if you run a DLT notebook cell on its own it will show in error, whereas in SQL it will check if the command is syntactically valid and tell you. In Python SQL Notes both cases, individual notebook cells are not supposed to be run for DLT pipelines. A note on imports None The dlt module should be explicitly imported into your Python notebook libraries. In SQL, this is not the case. Tables as DataFrames Tables as query results The Python DataFrame API allows for multiple transformations of a dataset by stringing multiple API calls together. Compared to SQL, those same transformations must be saved in temporary tables as they are transformed. @dlt.table() SELECT statement In SQL, the core logic of your query, containing transformations you make to your data, is contained in the SELECT statement. In Python, data transformations are specified when you configure options for @dlt.table(). @dlt.table(comment = "Python COMMENT "SQL comment" This is how you add comments and table properties comment", table_properties = TBLPROPERTIES in Python vs. SQL {"quality": "silver"}) ("quality" = "silver") Python Metaprogramming N/A You can use Python inner functions with Delta Live Tables to programmatically create multiple tables to reduce code redundancy.