Simulating a Data Pipeline with Delta Lake: A Tutorial

Delta Lake is an open-source project that manages storage layer in your Data lake. In practice it’s an Apache Spark abstraction reusing the same mechanisms offering extra resources such as ACID transactions support. Everyone knows that keeping data integrity in a data pipeline is a critical task in face of high data read and write concurrency.

Delta Lake provides audit history, data versioning and supports DML operations such as deletes, updates and merges.

For this tutorial, we’re going to simulate a data pipeline locally focusing on Delta Lake advantages. First, we’ll load a Spark Dataframe from a JSON file, a temporary view and then a Delta Table which we’ll perform some Delta operations.

Last, Java as programming language and Maven as dependency manager, besides Spark and Hive to keep our data catalog.

Maven

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>0.8.0</version>
</dependency>
</dependencies>

The code will be developed in short snippets to a better understanding.

Setting Spark with Delta and Hive

String val_ext="io.delta.sql.DeltaSparkSessionExtension";
String val_ctl="org.apache.spark.sql.delta.catalog.DeltaCatalog";
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("app");
sparkConf.setMaster("local[1]");
sparkConf.set("spark.sql.extensions",var_ext);
sparkConf.set("spark.sql.catalog.spark_catalog",val_ctl);
SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();

Understanding the code above

  • We define two variables val_ext and val_ctl by assigning the values ​​to the keys (spark.sql.extensions and spark.sql.catalog.spark_catalog). These are necessary for configuring Delta together with Spark
  • We named the Spark context of app
  • Since we are not running Spark on a cluster, the master is configured to run local local[1]
  • Spark supports Hive, in this case we enable it in the enableHiveSupport( )

Data Ingest

Let’s work with Spark Dataframe as the data source. We load a Dataframe from a JSON file.

order.json file

{"id":1, "date_order": "2021-01-23", "customer": "Jerry", "product": "BigMac", "unit": 1, "price": 8.00}
{"id":2, "date_order": "2021-01-22", "customer": "Olivia", "product": "Cheese Burguer", "unit": 3, "price": 21.60}
{"id":3, "date_order": "2021-01-21", "customer": "Monica", "product": "Quarter", "unit": 2, "price": 12.40}
{"id":4, "date_order": "2021-01-23", "customer": "Monica", "product": "McDouble", "unit": 2, "price": 13.00}
{"id":5, "date_order": "2021-01-23", "customer": "Suzie", "product": "Double Cheese", "unit": 2, "price": 12.00}
{"id":6, "date_order": "2021-01-25", "customer": "Liv", "product": "Hamburger", "unit": 1, "price": 2.00}
{"id":7, "date_order": "2021-01-25", "customer": "Paul", "product": "McChicken", "unit": 1, "price": 2.40}

Creating a Dataframe

Dataset<Row> df = sparkSession.read().json("datasource/");
df.createOrReplaceGlobalTempView("order_view");

Understanding the code above

In the previous section, we’re creating a Dataframe from the JSON file that is inside the datasource/ directory, create this directory so that the structure of your code is more comprehensive and then create the order.json file based on the content shown earlier .

Finally, we create a temporary view that will help us in the next steps.

Creating a Delta Table

Let’s create the Delta Table from an SQL script. At first the creation is simple, but notice that we used different types of a table used in a relational database. For example, we use STRING instead of VARCHAR and so on.

We are partitioning the table by the date_order field. This field was chosen as a partition because we believe there will be different dates. In this way, queries can use this field as a filter, aiming at better performance.

And finally, we define the table as Delta Table from the USING DELTA snippet.

String statement =
"CREATE OR REPLACE TABLE orders (" +
"id STRING, " +
"date_order STRING," +
"customer STRING," +
"product STRING," +
"unit INTEGER," +
"price DOUBLE) " +
"USING DELTA " +
"PARTITIONED BY (date_order) ";
sparkSession.sql(statement);

Understanding the code above

In the previous section we’re creating a Delta table called orders and then we execute the creation.

DML Operations

Delta supports Delete, Update and Insert operations using Merge

Using Merge together with Insert and Update

In this step, we are going to execute a Merge that makes it possible to control the flow of inserting and updating data through a table, Dataframe or view. Merge works from row matches, which will be more understandable in the next section.

String mergeStatement = "Merge into orders " +
"using global_temp.order_view as orders_view " +
"ON orders.id = orders_view.id " +
"WHEN MATCHED THEN " +
"UPDATE SET orders.product = orders_view.product," +
"orders.price = orders_view.price " +
"WHEN NOT MATCHED THEN INSERT * ";
sparkSession.sql(mergeStatement);

Understanding the code above

  • In the snippet above we’re executing the Merge operation from the view order_view created in the previous steps.
  • In the same section we have a condition orders.id = orders_view.id that will help in the following matches.
  • If the previous condition is true, that is, MATCHED is true. The data will be updated. Otherwise, NOT MATCHED. Data will be inserted.

In the case above, the data will be inserted, because until then there was no data in the orders table. Run the command below to view the inserted data.

sparkSession.sql("select * from orders").show();
orders table

Update the datasource/order.json file by changing the productprice field and run all snippets again. You will see that all records will be updated.

Update operation

It is possible to run Update without the need to use Merge, just run the command below:

String updateStatement = "update orders " +
"set product = 'Milk-Shake' " +
"where id = 2";
sparkSession.sql(updateStatement);

Delete operation

String deleteStatement = "delete from pedidos where id = 2";
sparkSession.sql(deleteStatement);

In addition to being able to execute the Delete command, it is possible to use this command with Merge.

Understanding Delta Lake Transaction Log (DeltaLog)

In addition to supporting ACID transactions, delta generates some JSON files that serve as a way to audit and maintain the history of each transaction, from DDL and DML commands

This mechanism it is even possible to go back to a specific state of the table if necessary.

For each executed transaction a JSON file is created inside the _delta_log folder. The initial file will always be 000000000.json, containing the transaction commits. In our scenario, this first file contains the commits for creating the orders table.

For a better view, go to the local folder that was probably created in the root directory of your project called spark-warehouse. This folder was created by Hive to hold resources created from JSON files and parquets. Inside it will have a folder structure as shown below:

_delta_log directory

Note that the files are created in ascending order from each executed transaction. Access each JSON file and you will see each transaction that was executed through the operation field, in addition to other information.

00000000000000000000.json

"operation":"CREATE OR REPLACE TABLE"

00000000000000000001.json

"operation":"MERGE"

00000000000000000002.json

"operation":"UPDATE"

00000000000000000003.json

"operation":"DELETE"

Also note that the parquet files were generated partitioned into folders by the date_order field.

Hope you enjoyed!

Leave a Reply

Your email address will not be published. Required fields are marked *