Full Transcript

DELTA LAKE DDL/DML: UPDATE, DELETE, MERGE, ALTER TABLE TIME TRAVEL (CONTINUED) Update rows tha...

DELTA LAKE DDL/DML: UPDATE, DELETE, MERGE, ALTER TABLE TIME TRAVEL (CONTINUED) Update rows that match a predicate condition Rollback a table to an earlier version WITH SPARK SQL UPDATE tableName SET event = 'click' WHERE event = 'clk' -- RESTORE requires Delta Lake version 0.7.0+ & DBR 7.4+. RESTORE tableName VERSION AS OF 0 Delta Lake is an open source storage layer that brings ACID Delete rows that match a predicate condition RESTORE tableName TIMESTAMP AS OF "2020-12-18" transactions to Apache Spark™ and big data workloads. DELETE FROM tableName WHERE "date < '2017-01-01" delta.io | Documentation | GitHub | Delta Lake on Databricks Insert values directly into table INSERT INTO TABLE tableName VALUES ( UTILITY METHODS (8003, "Kim Jones", "2020-12-18", 3.875), CREATE AND QUERY DELTA TABLES (8004, "Tim Jones", "2020-12-20", 3.750) View table details ); Create and use managed database -- Insert using SELECT statement DESCRIBE DETAIL tableName DESCRIBE FORMATTED tableName -- Managed database is saved in the Hive metastore. INSERT INTO tableName SELECT * FROM sourceTable Default database is named "default". -- Atomically replace all data in table with new values Delete old files with Vacuum DROP DATABASE IF EXISTS dbName; INSERT OVERWRITE loan_by_state_delta VALUES (...) VACUUM tableName [RETAIN num HOURS] [DRY RUN] CREATE DATABASE dbName; USE dbName -- This command avoids having to specify Upsert (update + insert) using MERGE Clone a Delta Lake table dbName.tableName every time instead of just tableName. MERGE INTO target -- Deep clones copy data from source, shallow clones don't. USING updates CREATE TABLE [dbName.] targetName Query Delta Lake table by table name (preferred) ON target.Id = updates.Id [SHALLOW | DEEP] CLONE sourceName [VERSION AS OF 0] WHEN NOT MATCHED THEN Interoperability with Python / DataFrames SELECT * FROM [dbName.] tableName INSERT (date, Id, data) -- or, use INSERT * -- Read name-based table from Hive metastore into DataFrame VALUES (date, Id, data) df = spark.table("tableName") Query Delta Lake table by path -- Read path-based table into DataFrame SELECT * FROM delta.`path/to/delta_table` -- note backticks Insert with Deduplication using MERGE df = spark.read.format("delta").load("/path/to/delta_table") MERGE INTO logs Run SQL queries from Python Convert Parquet table to Delta Lake format in place USING newDedupedLogs spark.sql("SELECT * FROM tableName") ON logs.uniqueId = newDedupedLogs.uniqueId -- by table name spark.sql("SELECT * FROM delta.`/path/to/delta_table`") WHEN NOT MATCHED CONVERT TO DELTA [dbName.]tableName [PARTITIONED BY (col_name1 col_type1, col_name2 THEN INSERT * Modify data retention settings for Delta Lake table col_type2)] -- logRetentionDuration -> how long transaction log history Alter table schema — add columns is kept, deletedFileRetentionDuration -> how long ago a file -- path-based tables ALTER TABLE tableName ADD COLUMNS ( must have been deleted before being a candidate for VACCUM. CONVERT TO DELTA parquet.`/path/to/table` -- note backticks col_name data_type ALTER TABLE tableName [PARTITIONED BY (col_name1 col_type1, col_name2 col_type2)] [FIRST|AFTER colA_name]) SET TBLPROPERTIES( delta.logRetentionDuration = "interval 30 days", Create Delta Lake table as SELECT * with no upfront Alter table — add constraint delta.deletedFileRetentionDuration = "interval 7 days" schema definition -- Add "Not null" constraint: ); ALTER TABLE tableName CHANGE COLUMN col_name SET NOT NULL SHOW TBLPROPERTIES tableName; CREATE TABLE [dbName.] tableName -- Add "Check" constraint: USING DELTA ALTER TABLE tableName AS SELECT * FROM tableName | parquet.`path/to/data` ADD CONSTRAINT dateWithinRange CHECK date > "1900-01-01" [LOCATION `/path/to/table`] -- Drop constraint: PERFORMANCE OPTIMIZATIONS -- using location = unmanaged table ALTER TABLE tableName DROP CONSTRAINT dateWithinRange Compact data files with Optimize and Z-Order Create table, define schema explicitly with SQL DDL *Databricks Delta Lake feature CREATE TABLE [dbName.] tableName ( OPTIMIZE tableName id INT [NOT NULL], TIME TRAVEL [ZORDER BY (colNameA, colNameB)] name STRING, date DATE, View transaction log (aka Delta Log) Auto-optimize tables int_rate FLOAT) *Databricks Delta Lake feature USING DELTA DESCRIBE HISTORY tableName ALTER TABLE [table_name | delta.`path/to/delta_table`] [PARTITIONED BY (time, date)] -- optional SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true) Query historical versions of Delta Lake tables Copy new data into Delta Lake table (with idempotent retries) SELECT * FROM tableName VERSION AS OF 0 Cache frequently queried data in Delta Cache SELECT * FROM tableName@v0 -- equivalent to VERSION AS OF 0 *Databricks Delta Lake feature COPY INTO [dbName.] targetTable SELECT * FROM tableName TIMESTAMP AS OF "2020-12-18" CACHE SELECT * FROM tableName FROM "/path/to/table" -- or: FILEFORMAT = DELTA -- or CSV, Parquet, ORC, JSON, etc. Find changes between 2 versions of table CACHE SELECT colA, colB FROM tableName WHERE colNameA > 0 Provided to the open source community by Databricks SELECT * FROM tableName VERSION AS OF 12 © Databricks 2021. All rights reserved. Apache, Apache Spark, Spark and the Spark logo are EXCEPT ALL SELECT * FROM tableName VERSION AS OF 11 trademarks of the Apache Software Foundation. WORKING WITH DELTA DELTATABLES TABLES TIME TRAVEL (CONTINUED) # A DeltaTable is the entry point for interacting with Find changes between 2 versions of a table WITH PYTHON tables programmatically in Python — for example, to df1 = spark.read.format("delta").load(pathToTable) perform updates or deletes. df2 = spark.read.format("delta").option("versionAsOf", from delta.tables import * 2).load("/path/to/delta_table") Delta Lake is an open source storage layer that brings ACID df1.exceptAll(df2).show() deltaTable = DeltaTable.forName(spark, tableName) transactions to Apache Spark™ and big data workloads. deltaTable = DeltaTable.forPath(spark, Rollback a table by version or timestamp delta.`path/to/table`) delta.io | Documentation | GitHub | API reference | Databricks deltaTable.restoreToVersion(0) deltaTable.restoreToTimestamp('2020-12-01') READS AND WRITES WITH DELTA LAKE DELTA LAKE DDL/DML: UPDATES, DELETES, INSERTS, MERGES Read data from pandas DataFrame Delete rows that match a predicate condition UTILITY METHODS df = spark.createDataFrame(pdf) # predicate using SQL formatted string Run Spark SQL queries in Python # where pdf is a pandas DF deltaTable.delete("date < '2017-01-01'") # then save DataFrame in Delta Lake format as shown below # predicate using Spark SQL functions spark.sql("SELECT * FROM tableName") deltaTable.delete(col("date") < "2017-01-01") spark.sql("SELECT * FROM delta.`/path/to/delta_table`") Read data using Apache Spark™ spark.sql("DESCRIBE HISTORY tableName") # read by path Update rows that match a predicate condition df = (spark.read.format("parquet"|"csv"|"json"|etc.) # predicate using SQL formatted string Compact old files with Vacuum.load("/path/to/delta_table")) deltaTable.update(condition = "eventType = 'clk'", deltaTable.vacuum() # vacuum files older than default # read by table name set = { "eventType": "'click'" } ) retention period (7 days) df = spark.table("events") # predicate using Spark SQL functions deltaTable.vacuum(100) # vacuum files not required by deltaTable.update(condition = col("eventType") == "clk", versions more than 100 hours old Save DataFrame in Delta Lake format set = { "eventType": lit("click") } ) Clone a Delta Lake table (df.write.format("delta").mode("append"|"overwrite") Upsert (update + insert) using MERGE deltaTable.clone(target="/path/to/delta_table/",.partitionBy("date") # optional # Available options for merges [see docs for details]: isShallow=True, replace=True).option("mergeSchema", "true") # option - evolve schema.whenMatchedUpdate(...) |.whenMatchedUpdateAll(...) |.saveAsTable("events") |.save("/path/to/delta_table").whenNotMatchedInsert(...) |.whenMatchedDelete(...) Get DataFrame representation of a Delta Lake table ) (deltaTable.alias("target").merge( df = deltaTable.toDF() source = updatesDF.alias("updates"), Streaming reads (Delta table as streaming source) condition = "target.eventId = updates.eventId") Run SQL queries on Delta Lake tables # by path or by table name.whenMatchedUpdateAll() spark.sql("SELECT * FROM tableName") df = (spark.readStream.whenNotMatchedInsert( spark.sql("SELECT * FROM delta.`/path/to/delta_table`").format("delta") values = {.schema(schema) "date": "updates.date",.table("events") |.load("/delta/events") "eventId": "updates.eventId", ) "data": "updates.data", PERFORMANCE OPTIMIZATIONS "count": 1 Streaming writes (Delta table as a sink) } Compact data files with Optimize and Z-Order ).execute() (df.writeStream.format("delta") ) *Databricks Delta Lake feature.outputMode("append"|"update"|"complete") spark.sql("OPTIMIZE tableName [ZORDER BY (colA, colB)]").option("checkpointLocation", "/path/to/checkpoints") Insert with Deduplication using MERGE.trigger(once=True|processingTime="10 seconds") (deltaTable.alias("logs").merge( Auto-optimize tables.table("events") |.start("/delta/events") newDedupedLogs.alias("newDedupedLogs"), *Databricks Delta Lake feature. For existing tables: ) "logs.uniqueId = newDedupedLogs.uniqueId") spark.sql("ALTER TABLE [table_name |.whenNotMatchedInsertAll() delta.`path/to/delta_table`].execute() SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true) CONVERT PARQUET TO DELTA LAKE ) To enable auto-optimize for all new Delta Lake tables: spark.sql("SET spark.databricks.delta.properties. defaults.autoOptimize.optimizeWrite = true") Convert Parquet table to Delta Lake format in place TIME TRAVEL from delta.tables import * Cache frequently queried data in Delta Cache View transaction log (aka Delta Log) *Databricks Delta Lake feature deltaTable = DeltaTable.convertToDelta(spark, fullHistoryDF = deltaTable.history() spark.sql("CACHE SELECT * FROM tableName") "parquet.`/path/to/parquet_table`") -- or: Query historical versions of Delta Lake tables spark.sql("CACHE SELECT colA, colB FROM tableName partitionedDeltaTable = DeltaTable.convertToDelta(spark, WHERE colNameA > 0") "parquet.`/path/to/parquet_table`", "part int") # choose only one option: versionAsOf, or timestampAsOf df = (spark.read.format("delta").option("versionAsOf", 0) Provided to the open source community by Databricks.option("timestampAsOf", "2020-12-18") © Databricks 2021. All rights reserved. Apache, Apache Spark, Spark and the Spark logo are.load("/path/to/delta_table")) trademarks of the Apache Software Foundation.

Use Quizgecko on...
Browser
Browser