Time travel and Schema adaptation in Databricks
As discussed in my earlier post on Databricks, Databricks offers two extremely cool features:
- Time Travel
2. Implicit schema change accommodation
Time Travel
(No, we won’t go back and try to kill Sarah Connor)
Let’s suppose we have a relational database with table ‘users’
There’s been an update and it's been decided to drop the ‘Role’ column.
You fire up the SQL query and promptly the column is dropped.
Few days pass by, and it's come to notice that other systems using this table are failing, and you need to get the column back.
If the table contained 5 records, you revive the column in a heartbeat.
But production-level data contains 100000+ records.
Now what?
DATABRICKS DELTA TO THE RESCUE!!!
Suppose you have the same table created on Databricks using Delta.
Once the update is made (the column is dropped) and you’ve come to realize you need to get the column back, you can use Databricks Delta Time Travel:
- Using a specific timestamp to go back to: Retrieve a version of the table as of a given date
%sql
select * from db_name.users TIMESTAMP AS OF "2021-01-01"
2. Using version number to go back to: Retrieve a specific version of the table
%sql
select * from db_name.users VERSION AS OF 3
* When the table is created, the version is 1, and is subsequently increased every time change operations occur (Upsert, Delete).
To sum it up, using Databricks Delta Time Travel, you can go back to an earlier version of a table, no matter how many operations have happened on the table thereon after.
Implicit schema change accommodation
While committing data to a table, there can be 3 distinct possibilities:
- Overwrite: The entire data is refreshed. This could be the scenario when the data is being replicated from a source with a low record count which permits the entire load every time.
- Append: New records are inserted, the existing records are left unchanged. This could be the scenario when the data is immutable, for example, sensor data.
- Merge: Upserts (update and insert) and delete operations occur. This is the most common scenario as generally, data gets updated, deleted and/or updated.
Let's consider the same table, ‘users’:
The column ‘Role’ is an optional column. Many records have just the other 4 fields, with Role missing. This behaviour is expected and valid according to the source team.
In any other relational database, if (6,“sandrews”, “Sam”, “Andrews”) is inserted, the SQL statement won’t be executed and an error stating that the table contains 5 columns but 4 are supplied will pop up.
That won’t happen in the case of Databricks Delta.
If the above record is inserted, the SQL statement will execute successfully and the table will look like this:
How to make this happen?
To answer that, let’s tackle this for each ingestion mode:
- Overwrite
For overwrite, use this property to enable schema and data overwrite:
.option(“overwriteSchema”,”true”)
Sample ingestion statement:
df.write.format(“delta”).mode(“overwrite”)
.option(“overwriteSchema”,”true”).saveAsTable(“…”) - Append
For append, use this property to enable schema and data overwrite:
.option(“mergeSchema”,”true”)
Sample ingestion statement:
df.write.format(“delta”).mode(“append”).option(“mergeSchema”,
”true”).saveAsTable(“…”) - Merge
For merge, use this cluster property to enable schema and data overwrite:
spark.databricks.delta.schema.autoMerge.enabled true
Sample ingestion statement:
MERGE INTO {target_table} t
USING {source_table_view} s
ON {join_condition}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
To sum it up, Databricks Delta allows implicit schema flexibility using which you can make sure that your pipeline does not fail and all schema changes are accounted for.