Data Engineering with Databricks.pdf
Document Details
Uploaded by FlatterPegasus
Full Transcript
Data Engineering with Databricks #databricks #dataengineer Query files A query can reference a single file or a folder. Examples: SQL select * from file_format.`/path/to/file`; select * from text.`/path/t...
Data Engineering with Databricks #databricks #dataengineer Query files A query can reference a single file or a folder. Examples: SQL select * from file_format.`/path/to/file`; select * from text.`/path/to/text_file`; select * from json.`{$VAR.path_to_json_file}`; select * from json.`/path/to/json_file.json`; select * from csv.`/path/to/files`; select * from binaryFile.`/path/to/file`; Option to external sources An external table can be created as follows: SQL Create table if not exists table_name(columns) using CSV --JDBC, JSON options( header="true", delimiter="|" ) location "${VAR.path_to_folder}" ; The same expression can be used with (in python) spark.sql(). SQL DESCRIBE EXTENDED external_table; --shows table's metadata REFRESH TABLE external_table; --refresh table's cache Cleaning data Null values SQL select count_if(col is null) from table_name; select count(*) from table_name where col is null; PYTHON from pyspark.sql.functions import col dataFrame = spark.read.table("table_name") dataFrame.selectExpr("count(col is null)") dataFrame.where(col("col").isNull()).count() drop duplicates SQL select dictinct(*) from table_name; select count(distinct(col1, col2,...)) from table_name wherecol1 is not null ; PYTHON dataFrame.distinct().count() dataFrame.dropDuplicates("col1", "col2",...).filter(col("col1").isNotNull()).count() date formatting SQL Select date_format(datetime_col,"MMM d, yyyy") as date_col ,date_format(datetime_col,"HH:mm:ss") as time_col ,regexp_extract(string_col, "regex_expr", 0) as regex_extraction from ( select * ,cast(number_col / 1e6 as timestampt) as datetime_col from table_name ) PYTHON dataFrame.withColum("datetime_col", col("number_col") / 1e6).cast("timestamp").withColum("date_col", date_format("datetime_col", "MMM d, yyyy")).withColum("time_col", date_format("datetime_col", "HH:mm:ss")).withColum("date_col", regexp_extract("string_col", "regex_expr", 0)) User-Defined Functions UDFs Custom column transformation function: can't be optimized by Catalyst Optimizer function is serialized and sent to executors row data is deserialized from spark's native binary format, and the results are serialized back into spark's native format for python's UDFs, additional interprocess' communication overhead between the executor and a python interpreter running on each worker node PYTHON from pyspark.sql.functions import col def foo(x): return x udf_foo = udf(foo) df.select(foo(col("col_name"))) udf() function register the udf: this serializes the function and sends it to executors to be able to transform DataFrames records. For sql requires spark.udf.register : SQL %python udf_foo = spark.udf.register("sql_udf", foo) %sql Select sql_udf(col) from table_name; UDF Decorators @udf decorator can be used as an alternative. Important: you will no longer be able to use the python function. The parameter of the decorator is the column datatype returned. Pandas/Vectorized UDFs Pandas UDFs uses Apache Arrow to speed up computation. It's defined by the decorator pandas_udf() PYTHON @pandas_udf("datatype") def foo(x): return x # Alternatively vector_foo = pandas_udf(foo, "datatype") the vector_foo() function can be registered in sql too. Delta Lake It is an open-source project that enables building a data lakehouse on top of the existing cloud storage. It is: open source builds upon standard data formats optimized for cloud object storage build for scalable metadata handling It is not: proprietary technology storage format storage medium database services or data warehouse Delta Lake Garanties ACID transactions_ atomicity consistency isolation: difers from other systems durability Data lake solves: hard to append data modification of existing data is difficult job failing mid-way real-time operations hard costly to keep historical data versions Delta lake is the default format in Databricks. Location option SQL CREATE SCHEMA IF NOT EXIST schema_name LOCATION 'path'; USE default_location_path; # to define a default location DECRIBE SCHEMA EXTENDED schema_name; # shows schema metadata DESCRIBE DETAIL location; # to look at the extended table description DROP SCHEMA schema_name CASCADE; The default location is in dbfs:/user/hive/warehouse. The default schema for managed tables is dbfs:/user/hive/warehouse/.db directory. An unmanaged/external table, can be created as follows: SQL CREATE OR REPLACE TABLE ext_table_name LOCATION 'path' AS select * from table_name; CTAS Create-table-as do not support manual schema declaration or additional file options. This options can be used with a temporary views. Example: SQL CREATE OR REPLACE TABLE table_name ( id STRING, transaction_timestamp STRING, price STRING, date DATE GENERATE ALWAYS AS ( CAST(CAST(transaction_timestamp/1e6 AS TIMESTAMP) AS DATE) ) COMMENT "generated based on 'transaction_timestamp' column" ); Generated enforces table schema, and it's a type of check constraint. Delta lake enforces schemas on write, also NOT NULL and CHECK constraints are supported: ALTER TABLE table_name ADD CONSTRAINT const_name CHECK (logical_condition); For merge statements, it should be configured to enable generating columns: SET spark.databricks.delta.schema.autoMerge.enabled = true. Add options and metadata CURRENT_TIMESTAMP() INPUT_FILE_NAME() gets the source data file COMMENT to allow easier discovery PARTITIONED BY the data is storage within its owns directory Note: The best practice is to not use partition option with delta lake because physically separates files preventing file compaction and efficient data skipping. Cloning Databricks has two options to efficiently copy delta lake tables: DEEP CLONE : fully copies data from source table to a target, incrementally (executing again updates the target table) SHALLOW CLONE : only copies delta transaction logs, without moving data. Greate to copy data fast for testing purposes. Overwrite table There are two ways to overwrite a table: "Create or replace as" (CRAS): Insert Overwrite SQL Create or replace table table_name as Select * from parquet.`/path/to/file`; The Insert overwrite : Can only overwrite an existing table, not create a new one like the CRAS Can overwrite only with new records that match the current table schema, which can be a "safer" technique for overwriting an existing table without disrupting downstream consumers Can overwrite individual partitions enforce the same schema SQL Insert Overwrite table_name Select * from parquet.`/path/to/file`; This operation shows the number of affected rows and the number of inserted rows. Both CRAS and Insert Overwrite have different history descriptions. Merge The merge or upsert statement supports insert, update and deletes, and extended syntax beyond standard SQL. The main benefits of merge are: updates, inserts and deletes are completed as a single transaction multiple conditions can be added in addition to matching fields extensive options for implementing custom logic Example: SQL Merge into target_table using source_table on {logic condition} when mathed and {conditions} then update set {target column}={source column} when not matched then insert * ; Insert-only merge to avoid duplication: Merge is useful to avoid inserting duplicate records. This is achieved providing only an insert in the "when not matched" clause. Load incrementally Databricks provides an idempotent option to incrementally ingest data from external systems: Copy Into. The Copy Into operation does have some expectations: data schema should be consistent duplicate records should try to be excluded or handled downstream This operation is potentially cheaper than a full table scans for data that grows predictably. SQL Copy Into table_name from "/path/to/file" fileformat = {PARQUET | JSON...} Versioning The managed tables are store in folders that can be display as follows: PYTHON %python display(dbutils.fs.ls(f"/{variable}/path")) There, another folder is created call _delta_log. This folder stores JSON files with the versions of the table and the transactions made against the tables as well. These files include information of what was made, added, removed, committed, etc. To query a specific table version is possible with the options TIMESTAMP AS OF and VERSION AS OF : SQL Select * from table_name TIMESTAMP AS OF {timestamp}; Select * from table_name VERSION AS OF {version number}; note: these versions and timestamps values comes from the DESCRIBE HISTORY display. Restore tables or time-traveling It's possible to restore a table from a previous version using TIMESTAMP or VERSION , this is also called time-traveling: SQL RESTORE TABLE table_name TIMESTAMP AS OF {timestamp}; RESTORE TABLE table_name VERSION AS OF {version number}; The restore operation creates a new version in the history as well. Optimization As many changes are made over the tables, the log files increase in number. To reduce the size and improve the retrieval, the optimize option replaces existing files by combining records and rewriting the results. A ZORDER can be used for indexing. SQL OPTIMIZE table_name ZORDER BY column; Vacuuming Delta Databricks cleans unused and old files automatically, it helps to reduce storage costs and to complain retention policies. Vacuum defines the retention period of the files. By default, vacuum is going to prevent to delete files less than 7 days old. SQL -- old files are delete immediately VACUUM table_name RETAIN 0 HOURS; -- turn off a check to prevent premature deletion of data files SET spark.databricks.delta.retentionDurationCheck.enable = false; -- make sure that logging of VACCUM command is enabled SET spark.databricks.delta.vacuum.enable = true; --delete old files printing all records to be deleted VACUUM table_name RETAIN 0 HOURS DRY RUN;