(Delta) Ch 7 Schema Handling.pdf
Document Details
Uploaded by EnrapturedElf
Tags
Full Transcript
CHAPTER 7 Schema Handling Traditionally, data lakes have always operated under the principle of schema on read, but have always had challenges enforcing schema on write. This...
CHAPTER 7 Schema Handling Traditionally, data lakes have always operated under the principle of schema on read, but have always had challenges enforcing schema on write. This means there is no predefined schema when data is written to storage, and a schema is only adapted when the data is processed. It is imperative for the case of analytics and data platforms that your table formats enforce the schema on write to prevent introducing change-breaking processes, and to maintain proper data quality and integrity. And while it is essential to adhere to schema on write, we must also acknowledge that in today’s fast-paced business climate and evolving landscape of data management, data sources, analytics, and simply just data and its overall structure are constantly changing. These changes need to be accounted for with schemas that are flexible enough to evolve over time in order to capture new, changing information. The schematic challenges often seen from traditional data lakes can be further classi fied into two key schema handling features that any data platform and table format, regardless of the storage layer, must support: Schema enforcement This is the process of ensuring that all data being added to a table conforms to that specific schema, where the schema defines a table structure by a list of column names, their data types, and any optional constraints. Enforcing data to fit to the structure of a defined schema helps to maintain the quality and consistency of the data, as any write to the table that does not comply with the schema is rejected. In turn, this helps prevent data quality issues that can arise from having data in different formats where it can be difficult to ensure that the data in the table is accurate and consistent. 151 Schema evolution This allows the data stored in the data lake to be flexible and adaptable to address the changing business requirements and data landscape. Schema evolu tion should be performed in a very conscious, controlled, and organized manner, and is mostly limited to the addition of columns to the schema. Fortunately, Delta Lake has excellent schema handling features that allow for both flexible schema evolution and rigid enforcement. This chapter will demonstrate how Delta Lake performs validation and enforcement, along with schema evolution sce narios and how Delta Lake can handle them. Schema Validation Every DataFrame that you create in Apache Spark will have a schema. The best way to look at a schema is as a blueprint or structure that defines the shape of your data. This includes the name of each column, its data type, whether or not the column can be NULL, and any metadata associated with each column. Delta Lake will store the schema of a Delta Table as a schemastring in the metaData action of the transaction log entries. In this section we will take a look at these entries. Next, we look at the validation rules that Delta Lake applies during a schema on write operation. We will close out this section with a use case for each of the schema validation rules. To follow along with the code, first execute the “00 - Chapter Initialization” notebook for Chapter 7 to create the TaxiRateCode Delta table. Viewing the Schema in the Transaction Log Entries Delta Lake will store the schema in JSON format inside the transaction log. For example, the initialization notebook writes a Delta table like this: # Write in Delta Lake format df.write.format("delta") \.mode("overwrite") \.save("/mnt/datalake/book/chapter07/TaxiRateCode") To take a look at how the schema is saved, open the “01 - Schema Enforcement” notebook. In this notebook, we see that the tables schema is saved in JSON format inside the transaction log when we view the transaction log file: %sh # The schemastring is part of the metaData action of the Transaction Log entry # The schemastring contains the full schema of the Delta table file # at the time that the log entry was written grep "metadata" /dbfs/mnt/datalake/.../TaxiRateCode.delta/_delta_log/...000.json > /tmp/commit.json python -m json.tool < /tmp/commit.json 152 | Chapter 7: Schema Handling We see the following output: { "metaData": { "id": "8f348474-0288-440a-a76e-2358ccf45a96", "format": { "provider": "parquet", "options": {} }, "schemaString": "{\"type\":\"struct\",\"fietds\":[{\"name\":\ "RateCodeId\",\"type\":\"integer\",\"nuHable\ ":true,\"metadata\":{}},{\"name\":\"RateCodeDesc\ ",\"type\":\"string\",\"nullable\":true,\ "metadata\":{}}]}", "partitioncolumns": [], "configuration": {}, "createdTime": 1681161987269 } } The schema is a structure (struct), with a list of fields representing the columns, where each field has a name, a type, and a nullable indicator that tells us whether the field is mandatory or not. Each column also contains a metadata field. The metadata field is a JSON string that can contain various types of information, depending on the transaction being performed, for example: The username of the person who executed the transaction The timestamp of the transaction The version of Delta Lake used The schema partition columns Any additional application-specific metadata that may be relevant to the transaction Schema on Write Schema validation rejects writes to a table that does not match a tables schema. Delta Lake performs schema validation on write, so it will check the schema of the data that is being written to the table. If the schema is compatible, the validation will pass and the write will succeed; if the schema of the data is not compatible, Delta Lake will cancel the transaction and no data is written. Note that this operation will always be atomic, so you will never have a condition where only a part of the data is written to the table. All source data is written when the transaction succeeds, and no source data is written when the validation fails. Schema Validation | 153 When schema validation fails, Delta Lake will raise an exception to let the user know about the mismatch. To determine whether a write to a table is compatible, Delta Lake uses the following rules: The source DataFrame to be written: Cannot contain any columns that are not present is the target table’s schema Note that it is allowed that the new data does not contain every column in the table, as long as the missing columns are marked as nullable in the target table’s schema. If a missing column was not marked as nullable in the target schema, the transaction will fail. Cannot have column data types that differfrom the column data types in the target table For example, if the target tables column contains StringType data, but the corresponding source column contains IntegerType data, schema enforcement will raise an exception and prevent the write operation from taking place. Cannot contain column names that differ only by case For example, if the source data contains a column named Foo and the source data has a column named f oo, the transaction will fail. There is a bit of history behind this particular rule: Spark can be used in case-sensitive or case-insensitive (default) mode. Parquet, on the other hand, is case-sensitive when storing and returning column information. Delta Lake is case preserving but insensitive when storing the schema. The preceding rules combined get rather complex. Therefore, to avoid potential mistakes, data corruption, or loss issues, Delta Lake will not allow column names that only differ in case. Schema Enforcement Example Let’s take a look at the details of schema enforcement. We will start out by appending a DataFrame with a matching schema, which will succeed without any issues. Next, we will add an additional column to the DataFrame and attempt to append it to the Delta table. We will validate that this results in an exception, and no data has been written. Matching schema To illustrate schema enforcement, we will first append a DataFrame with the correct schema to the TaxiRateCode table, as shown in step 2 in the “01 - Schema Enforce ment” notebook: 154 | Chapter 7: Schema Handling # Define the schema for the DataFrame # Notice that the columns match the table schema schema = StructType([ StructField("RateCodeId", IntegerType(), True), StructField("RateCodeDesc", StringType(), True) ]) # Create a list of rows for the DataFrame data = [(10, "Rate Code 10"), (11, "Rate Code 11"), (12, "Rate Code 12")] # Create a DataFrame, passing in the data rows # and the schema df = spark.createDataFrame(data, schema) # Perform the write. This write will succeed without any # problems df.write \.format("delta") \.mode("append") \.save("/mnt/datalake/book/chapter07/TaxiRateCode") Since the source and target schema align, the DataFrame is successfully appended to the table. Schema with an additional column In step 3 of the notebook, we will attempt to add one more column to the source schema: # Define the schema for the DataFrame # Notice that we added an additional column schema = StructType([ StructField("RateCodeId", IntegerType(), True), StructField("RateCodeDesc", StringType(), True), StructField("RateCodeName", StringType(), True) ]) # Create a list of rows for the DataFrame data = [ (15, "Rate Code 15", "C15"), (16, "Rate Code 16", "C16"), (17, "Rate Code 17", "C17")] # Create a DataFrame from the list of rows and the schema df = spark.createDataFrame(data, schema) # Attempt to append the DataFrame to the table df.write \.format("delta") \.mode("append") \.save("/mnt/datalake/book/chapter07/TaxiRateCode") Schema Validation | 155 This code will fail with the following exception: AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 8f348474-0288-440a-a76e-2358ccf45a96). When we scroll down, we see that Delta Lake provided a detailed explanation of what happened: To enable schema migration using DataFrameWriter or DataStreamWriter, please set: '.option("mergeSchema", "true")1. For other operations, set the session configuration spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation specific to the operation for details. Table schema: root -- RateCodeld: integer (nullable = true) -- RateCodeDesc: string (nullable = true) Data schema: root -- RateCodeld: integer (nullable = true) -- RateCodeDesc: string (nullable = true) -- RateCodeName: string (nullable = true) Delta Lake informs us that we can evolve the schema with the mergeSchema option set to true, which is something we will examine in the next section. It then shows us the table and source data schema, which is very helpful for debugging. When we look at the transaction log entries, we see the following: # Create a listing of all transaction log entries. # We notice that there are only two entries. # The first entry represents the creation of the table # The second entry is the append of the valid dataframe # There is no entry for the above code since the exception # occurred, resulting in a rollback of the transaction Is -al /dbfs/mnt/datalake/book/chapter07/TaxiRateCode/_delta_log/*.json -rwxrwxrwx 1 Apr 10 21:26 /dbfs/.../TaxiRateCode.delta/_delta_log/...000.json -rwxrwxrwx 1 Apr 10 21:27 /dbfs/.../TaxiRateCode.delta/_delta_log/...001.json The first entry (O000...0. json) represents the creation of the table, and the second entry (0000...1. json) is the append with the valid DataFrame. There is no entry for the preceding code, since the schema mismatch exception was thrown, and no data was written at all, illustrating the atomic behavior of Delta Lake transactions. A run of the DESCRIBE HISTORY command for the table confirms this: 156 | Chapter 7: Schema Handling %sql -- Look at the history for the Delta table DESCRIBE HISTORY delta.'/mnt/datalake/book/chapter07/TaxiRateCode' Output (only relevant data shown): + - — — +................ -+............................................................................... + |version|operation | operationparameters 1 + - — — +................. - +----------------------------------------------------------.......... + 11 | WRITE | {"mode":"Append","partitionBy":"[] "} 1 |0 | WRITE | {"mode":"Overwrite","partitionBy": "[]"} 1 + - — — +................. -+-----------------------------------.......... + In this section, we have seen schema enforcement at work. It provides peace of mind that your tables schema will not change unless you choose to change it. Schema enforcement ensures data quality and consistency for your Delta Lake tables, and keeps the developer honest and the tables clean. However, if you consciously decide that you really need the additional column in your table to facilitate your business, then you can leverage schema evolution, which we will cover in the next section. Schema Evolution Schema evolution in Delta Lake refers to the ability to evolve the schema of a Delta table over time, while preserving the existing data in the table. In other words, schema evolution allows us to add, remove, or modify columns in an existing Delta table without losing any data or breaking any downstream jobs that depend on the table. This is important as your data and business needs change over time and you may need to add new columns to your table or modify the existing columns to support new use cases. Schema evolution is enabled on the table level by using.option("mergeSchema", "true") during a write operation. You can also enable schema evolution for the entire Spark cluster by setting spark.databricks.delta.schema.auto Merge. enabled to true. By default, this setting will be set to false. When schema evolution is enabled, the following rules are applied: If a column exists in the source DataFrame being written but not in the Delta table, a new column is added to the Delta table with the same name and data type. All existing rows will have a null value for the new column. If a column exists in the Delta table but not in the source DataFrame being written, the column is not changed and retains its existing values. The new records will have a null value for the missing columns in the source DataFrame. Schema Evolution | 157 If a column with the same name but a different data type exists in the Delta table, Delta Lake attempts to convert the data to the new data type. If the conversion fails, an error is thrown. If a NuHType column is added to the Delta table, all existing rows are set to null for that column. Let’s look at a number of schema evolution scenarios, starting with the most com mon: adding a column to the table. Adding a Column Returning to our schema enforcement example, we can use schema evolution to add the RateCodeName column to the schema that was previously rejected due to schema mismatch. Recall that the rule states: If a column exists in the DataFrame being written but not in the Delta table, a new column is added to the Delta table with the same name and data type. All existing rows will have a null value for the new column. You can follow along with the code in the “02 - Schema Evolution” notebook. In step 2 of the notebook, schema evolution is activated by adding. option("mergeSchema", "true") to the.write Spark command: # Define the schema for the DataFrame # Notice the additional RateCodeName column, which # is not part of the target table schema schema = StructType([ StructField("RateCodeId", IntegerType(), True), StructField("RateCodeDesc", StringType(), True), StructField("RateCodeName", StringType(), True) 1) # Create a list of rows for the DataFrame data = [ (20, "Rate Code 20", "C20"), (21, "Rate Code 21", "C21"), (22, "Rate Code 22", "C22") 1 # Create a DataFrame from the list of rows and the schema df = spark.createDataFrame(data, schema) # Append the DataFrame to the Delta Table df.write \.format("delta") \.option("mergeSchema", "true") \.mode("append") \.save("/mnt/datalake/book/chapter07/TaxiRateCode") 158 | Chapter 7: Schema Handling # Print the schema df.printSchema() We see the new schema: root |-- RateCodeld: integer (nuilable = true) |-- RateCodeDesc: string (nullable = true) |-- RateCodeName: string (nullable = true) Now, the write operation will complete successfully, and the data will be added to the Delta table: %sql SELECT * FROM delta.'/mnt/datalake/book/chapter07/TaxiRateCode' ORDER BY RateCodeld We get the following output: +................... - +................................... — +...................... + |RateCodeld | RateCodeDes |RateCodeName| 11 | Standard Rate | null | |2 | JFK | null | |3 |Newark | null | |4 |Nassau or Westchester | null | |5 | Negotiated fare | null | |6 |Group ride | null | | 20 |Rate Code 20 1 C20 | I 21 |Rate Code 21 1 C21 | 122 |Rate Code 22 1 C22 | +.................... - +................................... — +.......................+ The new data has been added, and the RateCodeName for the existing rows has been set to null, which is expected. When we look at the corresponding transaction log entry, we can see that a new metadata entry has been written with the updated schema: "metaData": { "id": "ac676ac9-8805-4aca-9db7-4856a3c3a55b" , "format": { "provider": "parquet", "options": {} }, "schemastring": "{\"type\":\"struct\",\"fields\":[ {\"name\":\"RateCodeId\",\"type\":\"integer\",\"nullable\ ":true,\"metadata\":{}}, {\"name\":\"RateCodeDesc\",\"type\":\"string\",\"nullable\ ":true,\"metadata\":{}}, Schema Evolution | 159 {\"name\": \"RateCodeName\",\"type\":\"string\",\"nullable\ true,\"metadata\":{}}]}", "partitioncolumns": [], "configuration": {}, "createdTime": 1680650616156 } } This validates the schema evolution rules for adding columns. Missing Data Column in Source DataFrame Next, let’s take a look at the impact of removing a column. Recall that the rule states: If a column exists in the Delta table but not in the DataFrame being written, the column is not changed and retains its existing values. The new records will have a null value for the missing columns in the source DataFrame. In the “02 - Schema Evolution” notebook in step 3, there is a code example where we left the RateCodeDesc column out of the DataFrame: # Define the schema for the DataFrame schema = StructType([ StructField("RateCodeId", IntegerType(), True), StructField("RateCodeName", StringType(), True) 1) # Create a list of rows for the DataFrame data = [(30, "C30"), (31, "C31"), (32, "C32")] # Create a DataFrame from the list of rows and the schema df = spark.createDataFrame(data, schema) # Append the DataFrame to the table df.write \.format("delta") \.option("mergeSchema", "true") \.mode("append") \.save("/mnt/datalake/book/chapter07/TaxiRateCode") When we now look at the data in the Delta table, we see the following: + + |RateCodeId | RateCodeDes |RateCodeName| + + |Standard Rate | null |JFK | null |Newark | null |Nassau or Westchester| null |Negotiated fare | null |Group ride | null |Rate Code 20 | C20 |Rate Code 21 | C21 160 | Chapter 7: Schema Handling 122 |Rate Code 22 | C22 |30 Inuit | C30 |31 Inuit | C31 132 | nuii | C32 Observe the following behavior: The schema of the Delta table remains unchanged. The RateCodeDesc column values for the existing rows are not changed. The values for the RateCodeDesc column of the new DataFrame are set to NULL, since they do not exist in the DataFrame. When we look at the corresponding transaction log entry, you can see the commitinfo and three add sections (one for each new source record), but no new schemastring, implying that the schema was not changed: { "commitInfo": { } } { "add": { "stats": "{\"numRecords\":l,\"ninValues\";{\"RateCodeId\":30, \"RateCodeName\":\"C30\"},\"maxValues\": {\"RateCodeId\":30A"RateCodeName\":\"C30\"},\"nultCount\" :{\"RateCodeId\":0,\"RateCodeDesc\":1, \"RateCodeName\":0}}", } } } { "add": { "stats": ”{\"numRecords\":l,\"ninValues\":{\"RateCodeId\":31, \"RateCodeName\":\"C31\"},\"naxVatues\": {\"RateCodeId\":3lA"RateCodeName\":\"C31\"}A'’nuUCount\" :{\"RateCodeId\":0,\"RateCodeDesc\":1, \"RateCodeName\":0}}", "tags": { } { "add": { Schema Evolution | 161 "stats": "{\"numRecords\": l,\"PiinValues\": {\"RateCodeId\" :32, \"RateCodeName\":\"C32\"},\"maxValues\": {\"RateCodeId\":32,\"RateCodeName\":\"C32\"},\"nullCount\" :{\"RateCodeId\":0,\"RateCodeDesc\":1, \"RateCodeNanie\":0}}", "tags": { } } } This validates our rule for removing columns, as stated in the introduction. Changing a Column Data Type Next, lets take a look at the impact of changing a columns data type. Recall that the rule states: If a column with the same name but a different data type exists in the Delta table, Delta Lake attempts to convert the data to the new data type. If the conversion fails, an error is thrown. In step 4 of the “02 -Schema Evolution” notebook, we will first reset the table by removing the directory: dbutils.fs.rm("dbfs:/mnt/datalake/book/chapter07/TaxiRateCode", recurse=True) And then we can drop the table: %sql drop table taxidb.taxiratecode; Next, we re-create the table, but this time we use a short data type for the RateCodeld: # Read our CSV data, and change the data type of # the RateCodeld to short df = spark.read.format("csv") \.option("header", "true") \. load ("/pint/datalake/book/chapter07/TaxiRateCode. csv") df = df.withColumn("RateCodeld", df["RateCodeld"].cast(ShortType())) # Write in Delta Lake format df.write.format("delta") \.mode("overwrite") \.save("/mnt/datalake/book/chapter07/TaxiRateCode") # Print the schema df.printSchema() We can see the new schema, and verify that RateCodeld is now indeed a short data type: 162 | Chapter 7: Schema Handling root |-- RateCodeld: short (nullable = true) |-- RateCodeDesc: string (nullable = true) Next, we will attempt to change the data type of the RateCodeld column from a ShortType to an IntegerType, which is one of the supported conversions for schema evolution: # Define the schema for the DataFrame # Note that we now define the RateCodeld to be an # Integer type schema = StructType([ StructField("RateCodeId", IntegerType(), True), StructField("RateCodeDesc", StringType(), True) 1) # Create a list of rows for the DataFrame data = [(20, "Rate Code 20"), (21, "Rate Code 21"), (22, "Rate Code 22")] # Create a DataFrame from the list of rows and the schema df = spark.createDataFrame(data, schema) # Write the DataFrame with Schema Evolution df.write \.format("delta") \.option("mergeSchema", "true") \.mode("append") \.save("/mnt/datalake/book/chapter07/TaxiRateCode") # Print the schema df.printSchema() This code will successfully execute and print the following schema: root |-- RateCodeld: integer (nullable = true) |-- RateCodeName: string (nullable = true) A new schemastring is written in the corresponding transaction log entry with the IntegerType: { "metaData": { "id": "7af3c5b8-0742-431f-b2d5-5634aa316e94", "format": { "provider"; "parquet", "options": {} }, "schemastring": "{\"type\":\"struct\",\"fields\":[ [\"name\":\"RateCodeId\",\"type\":\"integer\",\"nullable\": true,\"metadata\":{}}, {\"name\":\"RateCodeDesc\",\"type\":\"string\",\"nullable\": true,\"metadata\":{}}]}", Schema Evolution | 163 "partitionCoiumns": [], "configuration": {}, "createdTime": 1680658999999 } } Currently, Delta Lake only supports a limited number of conversions: You can convert from a NuHType to any other type. You can upcast from a ByteType to a ShortType. You can upcast from a ShortType to an IntegerType (which is our use case from earlier). Adding a NuHType Column In Delta Lake, the NullTypeO type is a valid data type that is used to represent a column that can contain a null value, as shown in step 5 of the “02 - Schema Evolution” notebook: # Define the schema for the DataFrame schema = StructType([ StructField("RateCodeld", IntegerType(), True), StructField("RateCodeDesc", StringType(), True), StructField("RateCodeExp", NullTypeO, True) 1) # Create a list of rows for the DataFrame data = [ (50, "Rate Code 50", None), (51, "Rate Code 51", None), (52, "Rate Code 52", None)] # Create a DataFrame from the list of rows and the schema df = spark.createDataFrame(data, schema) df.write \.format("delta") \.option("mergeSchema", "true") \.mode("append") \.save("/mnt/datalake/book/chapter07/TaxiRateCode") # Print the schema df.printSchema() The schema for this DataFrame is: root |-- RateCodeld: integer (nullable = true) |-- RateCodeDesc: string (nullable = true) |-- RateCodeExp: void (nullable = true) 164 | Chapter 7: Schema Handling When we look at the metadata entry for the corresponding transaction log entry we see the nullable type reflected: "schemaString": "{\"type\":\"struct\",\"fields\":[ {\"name\":\"RateCodeId\",\"type\":\"integer\",\"nullable\" :true,\"netadata\":{}}, {\"name\";\"RateCodeDesc\",\"type\":\"string\",\"nullable\" :true,\"metadata\":{}}, {\"name\":\"RateCodeExp\",\"type\":\"void\",\"nullable\" :true,\"metadata\":{}}]}", We can see the data type reflected as void. Note that if we try to query this table with a SELECT *, we will get an error: %sql SELECT * FROM delta.'/pint/datalake/book/chapter07/TaxiRateCode' We get the following exception: java.lang.IllegalStateException: Couldn't find RateCodeExp#26346 in [RateCodeId#26344,RateCodeDesc#26345] The reason for this error is that NullType columns in Delta Lake do not have a defined schema, so Spark cannot infer the data type of the column. Therefore, when we try to run a SELECT * query, Spark is unable to map the NullType column to a specific data type, and the query fails. If you want to query the table, we can list the columns you need without the NullType column: %sql SELECT RateCodeld, RateCodeDesc FROM delta.'/mnt/datalake/book/chapter07/TaxiRateCode' This will succeed without any issues. Explicit Schema Updates So far we have leveraged schema evolution to allow the schema to evolve according to a number of rules. Let’s look at how we can explicitly manipulate a Delta table’s schema. First, we will add a column to a Delta table using both the SQL ALTER TABLE and ADD COLUMN commands. Next, we will use the SQL ALTER COLUMN statement to add comments to a table column. Next, we will use a variation of the ALTER TABLE Explicit Schema Updates | 165 command to change the column ordering for the table. We will review Delta Lake column mapping, since it is required for the following. Adding a Column to a Table In step 3 of the “03 - Explicit Schema Updates” notebook, we have an example of how to use the SQL ALTER TABLE...ADD COLUMN command to add a column to a Delta table: %sql ALTER TABLE delta.'/mnt/datalake/book/chapter07/TaxiRateCode' ADD COLUMN RateCodeTaxPercent INT AFTER RateCodeld Note that we used the AFTER keyword, so the column will be added after the RateCodeld field, and not at the end of the column list, as is the standard practice without the AFTER keyword. Similarly, we can use the FIRST keyword to add the new column at the first position in the column list. Looking at the schema with the DESCRIBE command, we see that the new column is indeed inserted after the RateCodeld column: + + + + |col_name | data_type|comment | + + + + |RateCodeld | int | null | |RateCodeTaxPercent|int | null | |RateCodeDesc |string | null | + + + + By default, nullability is set to true, so all of the values for the newly added column will be set to null: +...................+................................... +................................... —+ |RateCodeld|RateCodeTaxPercent|RateCodeDesc 1 11 1 null |Standard Rate 1 12 1 null | JFK 1 13 1 null |Newark 1 14 1 null |Nassau or Westchester| 15 1 null |Negotiated fare 1 16 1 null |Group ride 1 +..........----- + — ----- +................................... — + When we look at the transaction log entry for the ADD COLUMN operation, you see: A commitinfo action with the ADD COLUMN operator. A metaData action with the new schemastring. In the schemastring, we see the new RateTaxCodePercent column: 166 | Chapter 7: Schema Handling commitinfo": { operation": "ADD COLUMNS", operationParameters": { "columns": "[{\"column\":{\"name\":\"RateCodeTaxPercent\",\"type\": \"integer\",\"nullable\":true, \"metadata\":{}},\"position\":\"AFTER RateCodeId\"}]" } } { "metaData": { "schemastring": "{\"type\":\"struct\",\"fields\":[ {\"name\":\"RateCodeId\", \"type\":\"integer\",\"nullable\": true,\"metadata\":{}}, {\"name\":\"RateCodeTaxPercent\",\"type\":\"integer\",\"nullable\": true,\"metadata\":{}}, {\"name\":\"RateCodeDesc\",\"type\":\"string\",\"nullable\": true,\"metadata\":{}}]}", "partitioncolumns": [], "configuration": {}, "createdTime": 1681168745910 } } Note that there are no add or remove actions, so no data had to be rewritten for ADD COLUMN to succeed; the only operation Delta Lake had to perform is to update the schemaString in the metaData transaction log action. Adding Comments to a Column In step 3 of the “Explicit Schema Updates” notebook, we see how to add comments to a Delta table using SQL with the ALTER COLUMN statement. For example, if we have the standard taxtdb.TaxiRateCode table, we can add a comment to a column: %sql -- Add a comment to the RateCodeld column ALTER TABLE taxtdb.TaxiRateCode ALTER COLUMN RateCodeld COMMENT 'This is the id of the Ride' We see a commitlnfo entry in the corresponding transaction log entry with a CHANGE COLUMN operation, and the addition of the comment: Explicit Schema Updates | 167 commitinfo": { "userName": "[email protected]", "operation": "CHANGE COLUMN", "operationparameters": { "column": "{\"name\":\"RateCodeId\",\"type\":\"integer\", \"nullable\": true,\"metadata\": {\"comment\":\"This is the id of the Ride\"}}" }, } } In the metadata entry, we see the updated metadata for the column: "schemastring": "{\"type\":\"struct\",\"fields\":[ {\"name\":\"RateCodeId\",\"type\":\"integer\",\"nullable\": true,\"metadata\": {\"comment\":\"This is the id of the Ride\"}}, {\"name\":\"RateCodeDesc\",\"type\":\"string\",\"nullable\": true,\"metadata\":{}}]}", We can also see the column change with the DESCRIBE HISTORY command: DESCRIBE HISTORY taxidb.TaxiRateCode Changing Column Ordering By default, Delta Lake collects statistics on only the first 32 columns. Therefore, if there is a specific column that we would like to have included in the statistics, we might want to move that column in the column order. In step 4 of the “03 - Explicit Schema Updates” notebook, we can see how to use ALTER TABLE and ALTER COLUMN to change the order of the table. Right now, the table looks as follows: %sql DESCRIBE taxidb.TaxiRateCode +............................. — +.............. - - +............................ -------------------+ |col_name | data_type|comment I +--------- --------------- — +.............. - - +------------------------......................+ |RateCodeld |int |This is the id of the Ride| |RateCodeTaxPercent|int | null I |RateCodeDesc I string | null I +............................. — +.............. - - +.................................................. + Let’s assume that we want to move the RateCodeDesc column up so it appears after the RateCodeld. We can use the ALTER COLUMN syntax: %sql ALTER TABLE taxidb.TaxiRateCode ALTER COLUMN RateCodeDesc AFTER RateCodeld 168 | Chapter 7: Schema Handling After executing this statement, the schema will look as follows: + + + |col_name data_type|comment I +.................... -......... +-------- + + |RateCodeId |int |This is the id of the Ride| |RateCodeDesc |string | null | |RateCodeTaxPercent|int | null | + + + + You can combine column ordering and adding a comment within a single ALTER COLUMN statement. This operation will preserve all data in the table. Delta Lake Column Mapping Column mapping allows Delta Lake tables and the underlying Parquet file columns to use different names. This enables Delta Lake schema evolution such as RENAME COLUMN and DROP COLUMN on a Delta Lake table without the need to rewrite the underlying Parquet files. K At the time of writing, Delta Lake column mapping is in experi mental support mode, but this is an important, powerful feature to discuss that supports many common scenarios. You can find more information about column mapping at the Delta Lake documenta tion website. Delta Lake supports column mapping for Delta Lake tables, which enables metadata- only changes to mark columns as deleted or renamed without rewriting data files. It also allows users to name Delta table columns using characters that are not allowed by Parquet, such as spaces, so that users can directly ingest CSV or JSON data into Delta Lake without the need to rename columns due to previous character constraints. Column mapping requires the following Delta Lake protocols: Reader version 2 or above Writer version 5 or above Once a Delta table has the required protocol versions, you can enable column map ping by setting delta. columnmapping. mode to name. In step 4 of the “03 - Explicit Schema Updates” notebook, we can see that to check the reader and writer protocol versions of our table, we can use the DESCRIBE EXTENDED command: Explidt Schema Updates | 169 %sql DESCRIBE EXTENDED taxidb.TaxiRateCode + +..........................................................................................................+ |col_name |data_type | + ----------------- +----------------------------------------------------- + |RateCodeId |int | |Table Properties |[delta.minReaderVersion=l,delta.minWriterVersion=2] | + +........................................................................................................ + We see that the table is not at the protocol version required by column mapping. We can update both the versions and delta.columnmapping.mode with the following SQL statement: %sql ALTER TABLE taxidb.TaxiRateCode SET TBLPROPERTIES ( 'delta.minReaderVersion' = '2', 'delta.minWriterVersion' = '5', 'delta.columnMapping.mode' = 'name' ) When we look at the corresponding log entry for the SET TBLPROPERTIES statement, we see quite a few changes. First, we see a commitinfo action with the SET TBLPROPERTIES entry: { "commitinfo": { "operation": "SET TBLPROPERTIES", "operationparameters": { "properties": "{\"delta.mlnReaderVersion\":\"2\", \"delta.minWriterVersion\":\"5\", \"delta.columnMapping.mode\":\"name\"}" }, } } Next, we see a protocol action, informing us that the minReader and minWriter versions have been updated: { "protocol": { "minReaderVersion": 2, "minWriterVersion": 5 } } 170 | Chapter 7: Schema Handling And finally, we see a metaData entry with a schemastring. But now, column mapping has been added to the schemaStrtng: { "metaData": { f "schemastring": "{\"type\":\"struct\",\"fields\":[ {\"name\":\"RateCodeId\",\"type\":\"integer\",\"nullable\":true, \"metadata\":{\"comment\":\"This is the id of the Ride\", \"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\ ":\"RateCodeId\"}}, {\"name\":\"RateCodeDesc\",\"type\":\"string\",\"nullable\":true, \"metadata\":{\"delta.columnMapping,id\":2, \"delta.columnMapping.physicalName\":\"RateCodeDesc\"}}, {\"name\":\"RateCodeTaxPercent\",\"type\":\"integer\",\"nullable\": true, \"metadata\":{\"delta.columnMapping.id\":3, \"delta.columnMapping.physicalName\":\"RateCodeTaxPercent\"}}]}", > "configuration": { "delta.columnMapping.mode": "name", "delta.columnMapping.maxColumnld": "3" }, } } For each column, you have: The name, which is the official Delta Lake column name (e.g., RateCodeld). delta.columnMapping.id, which is the ID of the column. This ID will remain stable. delta. columnMapping. physicalName, which is the physical name in the Parquet file. Renaming a Column You can use ALTER TABLE...RENAME COLUMN to rename a column without rewriting any of the columns existing data. Note that column mapping needs to be in place for this to be enabled. Assume we want to rename the RateCodeDesc column to a more descriptive RateCodeDescrtption: %sql -- Perform our column rename ALTER TABLE taxidb.taxiratecode RENAME COLUMN RateCodeDesc to RateCodeDescription Explicit Schema Updates | 171 When we look at the corresponding log entry, we see the rename reflected in the schemastring: "schemastring": "{\"type\":\"struct\",\"fields\":[ {\"name\":\"RateCodeDescription\",\"type\":\"string\",\"nullable\" :true, \"metadata\":{\"delta.columnMapping.id\": 2,\"delta.columnMapptng.physicalName\":\"RateCodeDesc\"}}, We see that the Delta Lake column name has been changed to RateCodeDescription, but the physicalName is still RateCodeDesc in the Parquet file. This is how Delta Lake can perform a complex DDL operation, such as RENAME COLUMN, without needing to rewrite any files, as a simple metadata operation. Replacing the Table Columns In Delta Lake, the ALTER TABLE REPLACE COLUMNS command can be used to replace all the columns of an existing Delta table with a new set of columns. Note that in order to do this, you need to enable Delta Lake column mapping, as described in the previous section. Once column mapping is enabled, we can use the REPLACE COLUMNS command: %sql ALTER TABLE taxidb.TaxiRateCode REPLACE COLUMNS ( Rate_Code_Identifier INT COMMENT 'Identifies the code', Rate_Code_Description STRING COMMENT 'Describes the code', Rate_Code_Percentage INT COMMENT 'Tax percentage applied' ) When we look at the schema, we see the following: %sql DESCRIBE EXTENDED taxidb.TaxiRateCode +.........................................+................. + |col_name |data_type I +.........................................+................. + |Rate_Code_Identifier | int I |Rate_Code_Description| string I |Rate_Code_Percentage | int I I............. I I |Table Properties |[delta.columnMapping.maxColumnId=6, I | | delta.columnMapping.mode=name, I | |delta.minReaderVersion=2,delta.minWriterVersion=5] I 172 | Chapter 7: Schema Handling In the DESCRIBE output, we can see the new schema, and we can also see the mini mum reader and writer versions. When we look at the corresponding transaction log entry, we see the commitinfo with the REPLACE COLUMNS operation: "commitinfo": { "operation": "REPLACE COLUMNS", "operationparameters": { "columns": "[ {\"name\":\"Rate_Code_Identifier\",\"type\":\"integer\",\ "nullable\":true, \"metadata\":{\"comment\":\"Identifies the code\"}}, {\"name\":\"Rate_Code_Description\",\"type\":\"string\",\ "nullable\":true, \"metadata\":{\"comment\":\"Describes the code\"}}, {\"name\":\"Rate_Code_Percentage\",\"type\":\"integer\",\ "nullable\":true, \"metadata\":{\"comment\":\"Tax percentage applied\"}}]" }, } } In the metaData section, we see the new schemastring with some interesting infor mation. The new Delta Lake columns are now mapped to guide-based column names with new IDs (starting with 4): { "metaData": { , "schemastring": ”{\"type\":\"struct\",\"fields\":[ [\"name\":\"Rate_Code_Identifier\",\"type\";\"integer\" , \"nullable\":true, \"metadata\":{\"comment\":\"Identifies the code\", \"delta.columnMapping.id\":4, V'delta.columnMapping.physicalName\": \"col-72397feb-3cb0-4613-baad-aa78fff64a40\"}}, [\"name\":\"Rate_Code_Description\",\"type\": \"string\",\"nullable\":true, \"metadata\":{\"comment\":\"Describes the code\", \"delta.columnMapping.id\" :5, \"delta.columnMapping.physicalName\": \Hcol-67d47d0c-5d25-45d8-8d0e-c9bl3f5f2c6e\"}}, {\"name\":\"Rate_Code_Percentage\",\"type\":\"integer\",\"nullable\":true, \"metadata\":{\"comment\":\"Tax percentage applied\", \"delta.columnMapping.id\":\"delta.columnMapping.physicalName\": \"col-3b8f9847-71df-4e64-a921-64c918de328d\"}}]}",... "configuration": { "delta.columnMapping.mode": "name", "delta.columnMapping.maxColumnld": "6" Explicit Schema Updates | 173 When we look at the data, we see all six rows, but all columns are set to null: +-------------------- ----------- + + + | Rate_Code_Identi.fi.er | Rate_Code_Description | Rate_Code_Percentage | | null | null | null | | null | null | null 1 | null | null | null 1 | null | null | null 1 | null | null | null 1 | null | null | null 1 +.................................... +....................................... +......................................+ The REPLACE COLUMNS operation sets all the column values to null because the new schema might have different data types or a different order of columns than the old schema. As a result, the existing data in the table may not fit the new schema. Therefore, Delta Lake sets the value of all columns to null to ensure that the new schema is applied consistently to all records in the table. 8 Its important to note that the REPLACE COLUMNS operation can be a destructive operation, as it replaces the entire schema of the Delta table and rewrites the data in the new schema. Therefore, you should use it with caution and make sure to back up your data before applying this operation. Dropping a Column Delta Lake now supports dropping a column as a metadata-only operation without rewriting any data files. Note that column mapping must be enabled for this operation. It is important to note that dropping a column from metadata does not delete the underlying data for the column in the files. To purge the dropped column data, you can use REORG TABLE to rewrite the files. You can then use the VACUUM command to physically delete the files that contain the dropped column data. Let’s start with the standard schema in the taxtdb. TaxtRateCode table: root |-- RateCodeld: integer (nullable = true) |-- RateCodeDesc: string (nullable = true) 174 | Chapter 7: Schema Handling Let’s assume that we want to drop the RateCodeDesc column. We can use the ALTER TABLE with the DROP COLUMN SQL command to do this: %sql -- Use the ALTER TABLE... DROP COLUMN command -- to drop the RateCodeDesc column ALTER TABLE taxidb.TaxiRateCode DROP COLUMN RateCodeDesc When we use the DESCRIBE command to view the schema, we see that we only have the RateCodeld column left: +...................+................. + -...............+ |col_name |data_type| comment| |RateCodeld|int I null | +.................. +........... — +............... + When we check the table, we see that our data is still there, minus the dropped columns: %sql -- Select the remaining columns SELECT * FROM taxidb.TaxiRateCode |RateCodeld| +...................+ I 1 I I 2 | I 3 | I 4 | I 5 | I 6 | We see the following sections in the corresponding transaction log entry: A commit Info action that specifies the DROP COLUMNS operation: { "commitinfo": { "operation": "DROP COLUMNS", "operationparameters": { "columns": "[\"RateCodeDesc\"]" }, Explicit Schema Updates | 175 A metaData action that specifies the new schema, including the column mapping in the metadata section: { "metaData": { "schemastring": "{\"type\":\"struct\",\"fields\": [{\"name\":\"RateCodeId\",\"type\":\"integer\", \"nullable\":true, \"metadata\":[\"delta.columnMapping.td\": 1, \"delta.columnMapping.physicalName\": \"RateCodeId\"}}]}", "configuration": { "delta.columnMapping.mode": "name", "delta.columnMapping.maxColumnld": "2" }, } } Note that the RateCodeDesc column has only been “soft deleted.” When we looked at the transaction log entry earlier, what was most remarkable was not what was there, but what was not there. There were no remove and add actions for a data file, so no part files were rewritten, and the old part file is still there with both the RateCodeld and the RateCodeDesc columns. When we look at the part files, we see our one part file: %sh # Display the data file(s) # You can see you only have our one part file, which was not # touched at all Is -al /dbfs/mnt/datalake/book/chapter07/TaxiRateCode.delta drwxrwxrwx 2 root root 4096 Apr 6 00:00 _delta_log -rwxrwxrwx 1 root root 980 Apr 6 00:00 part-00000-...-C000.snappy.parquet When you download and view the file with a Parquet viewer,1 you can see that both columns are still there (see Table 7-1). 1 For example, Parquet Viewer 176 | Chapter 7: Schema Handling Table 7-1. Viewing the Parquet file after DROP COLUMN I RateCodeld RateCodeDesc 1 1 Standard rate 2 JFK 3 Newark 4 Nassau or Westchester 5 Negotiated fare 6 Group ride DROP COLUMN only updates the metadata—it does not add or remove any part files. When working with large files, having this “soft-deleted” data around can result in the small file problem. Therefore, in the next section, we will use the REORG TABLE command to reclaim the space for the deleted column. The REORG TABLE Command The REORG TABLE command reorganizes a Delta Lake table by rewriting files to purge soft-deleted data, which we created in the previous section, where we dropped a column with the ALTER TABLE DROP COLUMN command. To reclaim the space occupied by the RateCodeDesc column that we dropped, we can issue the following command: %sql -- Reorganize the table by removing the part file which included -- the RateCodeDesc column and adding a new part file with just the -- RateCodeld column REORG TABLE taxidb.TaxiRateCode APPLY (PURGE) After running this command, Delta Lake will display the path it used to exe cute the command, which in this case is dbfs:/mnt/datalake/book/chapter07/TaxiRate- Code.delta. It will also display the metrics, which contain the number of files added and removed: Explicit Schema Updates | 177 "numFilesAdded": 1, "numFilesRemoved": 1, "filesAdded": { "min": 665, "max": 665, "avg": 665, "totalFiles": 1, "totalsize": 665 }, "filesRemoved": { "min": 980, "max": 980, "avg": 980, "totalFiles": 1, "totalsize": 980 }, "partitionsOptimized": 0, } One file was removed (the part file with both columns) and another was added (the part file with just the RateCodeld column). When we look at the corresponding transaction log entry, we see the following add and remove actions: remove": { "path": "part-00000-...-C000.snappy.parquet", } { "add": { "path": "9g/part-00000-...-C000.snappy.parquet", "partitionvalues": {}, "stats": "{\"numRecords\":6,\"minValues\":{\"RateCodeId\":l}, \"maxValues\":{\"RateCodeId\":6},\"nullCount\":{\"RateCodeId\":0}}", } In the remove action, we remove the original Parquet file containing both columns and add a file in a subdirectory. When we look at that location, we see the Parquet 178 | Chapter 7: Schema Handling %sh # This is the new part file, which contains just the RateCodeld column Is -al /dbfs/mnt/datalake/book/chapter07/TaxiRateCode.delta/9g -rwxrwxrwx 1 root root 665 Apr 6 01:45 part-00000-.... snappy.parquet When you download this file and view it, you can see that only the RateCodeld column is present, as shown in Table 7-2. Table 7-2. The newly added part file after the REORG TABLE command 1 7 T 1 7 7 Changing Column Data Type or Name We can change a columns data type or name or drop a column by manually rewriting the table. To do this, we can use the overwriteSchema option. Lets start with the standard schema: root |-- RateCodeld: integer (nullable = true) |-- RateCodeDesc: string (nullable = true) Next, change the data type of the RateCodeld column from integer to short. We can rewrite the table. First, we read the table, use the.withColumn PySpark function to change the data type of the RateCodeld column, and then write the table back with the overwriteSchema option set to True: # # Rewrite the table with the overwriteSchema setting # Use.withColumn to change the data type of the RateCodeld column # spark.read.table('taxidb.TaxiRateCode') \.withColumn("RateCodeId", col("RateCodeld").cast("short")) \.write \.mode("overwrite") \.option("overwriteSchema", "true") \.saveAsTable('taxidb.TaxiRateCode') If we check the schema of the table with DESCRIBE, we see the data type change for the RateCodeld table: Explicit Schema Updates | 179 %sql DESCRIBE taxidb.TaxiRateCode +.......................+ + + |col_name |data_type| comment | +.......................+ + + |RateCodeId |smallint |null | | RateCodeDesc | string |null | +.......................+ + + When we check the transaction log entry for this operation, we see four entries: 1. The commitinfo with the CREATE OR REPLACE TABLE AS SELECT operation: { "commitinfo": {... "operation": "CREATE OR REPLACE TABLE AS SELECT", } } 2. The metaData action with the schemastring: { "metaData": { "schemastring": "{\"type\":\"struct\",\"fields\":[ {\"name\":\"RateCodeId\",\"type\":\"short\",\"nuUable\": true,\"metadata\":{}}, {\"name\": \ "RateCodeDesc\", \"type\": \" string\", \" nuUable\": true,\"metadata\":{}}]}", } } 3. A remove action that removes the old part file from the table: { remove : { "path": "part-00000-....snappy.parquet", } } 4. The add action that adds a part file with our six records: "add": { "path": "part-00000-....snappy.parquet", "partitionvalues": {}, "stats": "{\"numRecords\":6,\"minValues\" :{\"RateCodeId \":!,\"RateCodeDesc\":\"Group ride\"}, 180 | Chapter 7: Schema Handling \"maxValues\":f\"RateCodeId\":6,\"RateCodeDesc \":\"Standard Rate\"}, \"nuUCount\":{\"RateCodeId\":0,\"RateCodeDesc } } Here we have demonstrated that we can use PySpark to change the data type of a column, albeit at the cost of completely rewriting the Delta table. The same approach can be used to drop columns or change column names. Condusion Modern data platforms leveraging ETL for analytics will always be consumers of data as they ingest data from various data sources. And as organizations continue to collect, process, and analyze data from a growing number of data sources, the ability to swiftly handle schema evolution and data validation is a critical aspect of any data platform. In this chapter you have seen how Delta Lake gives you flexibility to evolve a tables schema through dynamic and explicit schema updates, while also enforcing schema validation. Using transaction log entries, Delta Lake stores a Delta table’s schema in the metaData action. This schema, which contains column names and data types, is used to support schema validation and report schema mismatches on attempted operations. This schema validation is atomic in nature for operations on Delta tables, which can be illustrated in transaction log entries, or rather the omission of transaction log entries for schema violations. And while you learned that Delta Lake supports schema validation, you also learned that it supports dynamic schema evolution to add, remove, or modify columns in existing Delta tables. You can evolve a table’s schema using the mergeSchema option, or you can explicitly update a schema to add, remove, or rename columns or data types, while also adding comments or changing the column order (which is important for data skipping) using SQL or DataFrame syntax. All of these types of schema operations, including supported conversions for data types, are demonstrated throughout the chapter along with their corresponding commands (e.g., REPLACE COLUMNS) and transaction log entries to play these actions out and illustrate these behaviors. While schema evolution focuses primarily on changes in batch data operations, the following chapter will explore the requirements and operations needed for streaming data using Spark Structured Streaming. Conclusion | 181