Data Engineering with Databricks.pdf
Document Details
Uploaded by FlatterPegasus
Full Transcript
Data Engineering with Databricks #databricks #dataengineer Query files A query can reference a single file or a folder. Examples: SQL select * from file_format.`/path/to/file`; select * from text.`/path/t...
Data Engineering with Databricks #databricks #dataengineer Query files A query can reference a single file or a folder. Examples: SQL select * from file_format.`/path/to/file`; select * from text.`/path/to/text_file`; select * from json.`{$VAR.path_to_json_file}`; select * from json.`/path/to/json_file.json`; select * from csv.`/path/to/files`; select * from binaryFile.`/path/to/file`; Option to external sources An external table can be created as follows: SQL Create table if not exists table_name(columns) using CSV --JDBC, JSON options( header="true", delimiter="|" ) location "${VAR.path_to_folder}" ; The same expression can be used with (in python) spark.sql(). SQL DESCRIBE EXTENDED external_table; --shows table's metadata REFRESH TABLE external_table; --refresh table's cache Cleaning data Null values SQL select count_if(col is null) from table_name; select count(*) from table_name where col is null; PYTHON from pyspark.sql.functions import col dataFrame = spark.read.table("table_name") dataFrame.selectExpr("count(col is null)") dataFrame.where(col("col").isNull()).count() drop duplicates SQL select dictinct(*) from table_name; select count(distinct(col1, col2,...)) from table_name wherecol1 is not null ; PYTHON dataFrame.distinct().count() dataFrame.dropDuplicates("col1", "col2",...).filter(col("col1").isNotNull()).count() date formatting SQL Select date_format(datetime_col,"MMM d, yyyy") as date_col ,date_format(datetime_col,"HH:mm:ss") as time_col ,regexp_extract(string_col, "regex_expr", 0) as regex_extraction from ( select * ,cast(number_col / 1e6 as timestampt) as datetime_col from table_name ) PYTHON dataFrame.withColum("datetime_col", col("number_col") / 1e6).cast("timestamp").withColum("date_col", date_format("datetime_col", "MMM d, yyyy")).withColum("time_col", date_format("datetime_col", "HH:mm:ss")).withColum("date_col", regexp_extract("string_col", "regex_expr", 0)) User-Defined Functions UDFs Custom column transformation function: can't be optimized by Catalyst Optimizer function is serialized and sent to executors row data is deserialized from spark's native binary format, and the results are serialized back into spark's native format for python's UDFs, additional interprocess' communication overhead between the executor and a python interpreter running on each worker node PYTHON from pyspark.sql.functions import col def foo(x): return x udf_foo = udf(foo) df.select(foo(col("col_name"))) udf() function register the udf: this serializes the function and sends it to executors to be able to transform DataFrames records. For sql requires spark.udf.register : SQL %python udf_foo = spark.udf.register("sql_udf", foo) %sql Select sql_udf(col) from table_name; UDF Decorators @udf decorator can be used as an alternative. Important: you will no longer be able to use the python function. The parameter of the decorator is the column datatype returned. Pandas/Vectorized UDFs Pandas UDFs uses Apache Arrow to speed up computation. It's defined by the decorator pandas_udf() PYTHON @pandas_udf("datatype") def foo(x): return x # Alternatively vector_foo = pandas_udf(foo, "datatype") the vector_foo() function can be registered in sql too. Delta Lake It is an open-source project that enables building a data lakehouse on top of the existing cloud storage. It is: open source builds upon standard data formats optimized for cloud object storage build for scalable metadata handling It is not: proprietary technology storage format storage medium database services or data warehouse Delta Lake Garanties ACID transactions_ atomicity consistency isolation: difers from other systems durability Data lake solves: hard to append data modification of existing data is difficult job failing mid-way real-time operations hard costly to keep historical data versions Delta lake is the default format in Databricks. Location option SQL CREATE SCHEMA IF NOT EXIST schema_name LOCATION 'path'; USE default_location_path; # to define a default location DECRIBE SCHEMA EXTENDED schema_name; # shows schema metadata DESCRIBE DETAIL location; # to look at the extended table description DROP SCHEMA schema_name CASCADE; The default location is in dbfs:/user/hive/warehouse. The default schema for managed tables is dbfs:/user/hive/warehouse/.db directory. An unmanaged/external table, can be created as follows: SQL CREATE OR REPLACE TABLE ext_table_name LOCATION 'path' AS select * from table_name; CTAS Create-table-as do not support manual schema declaration or additional file options. This options can be used with a temporary views. Example: SQL CREATE OR REPLACE TABLE table_name ( id STRING, transaction_timestamp STRING, price STRING, date DATE GENERATE ALWAYS AS ( CAST(CAST(transaction_timestamp/1e6 AS TIMESTAMP) AS DATE) ) COMMENT "generated based on 'transaction_timestamp' column" ); Generated enforces table schema, and it's a type of check constraint. Delta lake enforces schemas on write, also NOT NULL and CHECK constraints are supported: ALTER TABLE table_name ADD CONSTRAINT const_name CHECK (logical_condition); For merge statements, it should be configured to enable generating columns: SET spark.databricks.delta.schema.autoMerge.enabled = true. Add options and metadata CURRENT_TIMESTAMP() INPUT_FILE_NAME() gets the source data file COMMENT to allow easier discovery PARTITIONED BY the data is storage within its owns directory Note: The best practice is to not use partition option with delta lake because physically separates files preventing file compaction and efficient data skipping. Cloning Databricks has two options to efficiently copy delta lake tables: DEEP CLONE : fully copies data from source table to a target, incrementally (executing again updates the target table) SHALLOW CLONE : only copies delta transaction logs, without moving data. Greate to copy data fast for testing purposes.