Practical Example of Schema Merging in Apache Spark

Schema merging is a way to evolve schemas through of a merge of two or more tables. I prefer show you with a practice example, so let’s do this!

Here we have two files simulating Data sources that we’ll load into a Dataframe later. The first one represents a Data source of users and the second is about an address.

Data source User

Let’s suppose the file’s name below is user.json

{"id":1, "login": "Jonh", "age": 24}
{"id":2, "login": "Lucas", "age": 24}
{"id":3, "login": "Miesha", "age": 25}
{"id":4, "login": "Suzie", "age": 22}

Data source Address

..and for the file below address.json

{"id":1, "city": "Los Angeles", "country": "USA"}
{"id":2, "city": "New York", "country": "USA"}
{"id":3, "city": "San Louis Obispo", "country": "USA"}

Now we have two semi-structured files created and we can move on to the next step.

Creating a Spark Session

For this example we’re going to use Java as programming language but compare to others language that supports Spark, almost nothing changes.

SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");
SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate();

Creating Dataframes

Here we’re going to create also two Dataframes for each Data source specified above.

Dataset<Row> dfUser =
sparkSession.read().json("user.json");
Dataset<Row> dfAddress =
sparkSession.read().json("address.json");

Generating Parquets

Here we’re creating the parquet files with different partitions key.

dfUser.write().parquet("data/table/partition=1");
dfAddress.write().parquet("data/table/partition=2");

Once created we can perform a reading over tables specifying the partition.

sparkSession.read().parquet("data/table/partition=1").show()
Users parquet table
sparkSession.read().parquet("data/table/partition=2").show()
Address parquet table

Merging Schema

Now the idea is to merge these two parquet tables creating a new Dataframe that can be persisted later.

Dataset<Row> dfMerge = sparkSession
.read().option("mergeSchema", true)
.parquet("data/table");

Note that we’re using a parameter as an option called mergingSchema passing true as value.

If we perform a simple reading over dfMerge Dataframe you’ll see the following result below.

Merge result

Obviously that was an example just to show how merge schema works it, so if you’re interested to use this feature, I recommend seeing the documentation.

Leave a Reply

Your email address will not be published. Required fields are marked *