Apache Hudi: Copy on Write(CoW) Table

Ankur Ranjan
13 min readOct 6, 2023

--

As Data Engineer, we frequently encounter the tedious task of performing multiple UPSERT(update + insert) and DELETE operations in batch and streaming pipelines on our Data Lake platform. This can be overwhelming and time-consuming in Data Lake architecture.

The primary reason why these operations are very tedious in DataLake is because HDFS/Cloud Storage(S3, GCS etc.) are immutable. Whenever there is any update or deletion in a single row, the whole data or partition folder has to be read in DataLake and then it has to be overwritten.

There is the classic way of handling UPSERT and DELETE in DataLake i.e. Read in all Data, Combine and then Overwrite. If you have created any ETL/ELT pipeline and used Data Lake architecture then most probably you might have used this approach. Do you know you can solve this problem very efficiently by moving toward Data LakeHouse architecture?

In this article, try to observe the image and illustration very carefully to understand the concepts clearly. I have mentioned steps there. Try to follow & observe the steps carefully.

1. Data Lake vs. Data LakeHouse

Let’s try to understand the above problem by taking an example of a normal librarian and an intelligent librarian.

Data Lake Classic approach:

If you have worked in a Data Lake environment, you might find the illustration above familiar. This is how we typically implement UPSERT in Data Lake: we read either some partitions or the entire data, combine the existing data with the new one, and overwrite the merged data to the Data Lake. To clarify, this method is known as the UPSERT process.

Now let’s see how a smart and organised librarian like Data LakeHouse architecture🤩 will try to handle new upcoming data.

Data LakeHouse Approach:

If you have observed the above illustration then you might have observed the organised way of handling books or paper in a well-managed library. This is the main difference between Data Lake and LakeHouse architecture. LakeHouse provides a good abstraction of data by exposing tables to end users instead of just RAW files of Data Lake.

If you have worked with databases like Postgres or MySQL, you are familiar with the power of tables. These tables handle all the database operations such as UPDATES, DELETE, or ACID transactions behind the scenes. The LakeHouse architecture is trying to provide the same abstraction by providing a solution by making our system as robust and abstract as these tables.

To clarify, users don’t need to worry as much about UPDATE, INSERT, and DELETE operations. It’s table properties which will help users to implement these things efficiently.

2. Enabling LakeHouse using an open table format

Let’s try to understand first how to implement the LakeHouse architecture using an open table format on top of Data Lake and then we will try to understand the Copy On Write(CoW) table of Apache Hudi in depth.

If you observe the above illustration then you can see that this is mostly like a Data Lake except Open Table formats like Apache Hudi or Apache Iceberg.

I am keeping LakeHouse architecture very minimal here to focus on the Copy on Write(Cow) table of Apache Hudi. But do check out my old article to understand the need for LakeHouse architecture.

So, as you have seen in the above illustration I have used Apache Hudi as an open table format, Let’s try to understand the Table type of Apache Hudi in depth.

3. Types of Apache Hudi:

Hudi supports two types of tables

  1. Copy on Write(CoW)
  2. Merge of Read(MoR) table

But In this article, we will be focusing only on the CoW table in the article.

Before we dive into CoW, let’s walk through some of the terminologies used in hudi for a better understanding of the following sections.

Data or Base File:

Hudi stores data in columnar parquet format and is called data files/ base files. This is known to be very performant and is widely used across the industry. Data files and base files are interchangeably used in general, but both mean the same.

Here are some of the key features of base files in Apache Hudi:

  • Base files are immutable: This means that once a base file is created, it cannot be changed.
  • Base files are versioned: This means that each base file has a version number. The version number is used to track the changes made to the data in the base file.
  • Base files are stored in a distributed file system: This allows Hudi to scale to large datasets.

File Version:

Let me explain in the context of COW. Whenever an update happens to a data file, a newer version of the data file will be created which contains merged records from older data files and newer incoming records. This information will be passed to metadata and metadata will record the latest version of the file version. It helps to get the latest data and even time series data.

Now let’s try to understand Copy-on-write (CoW) by illustration, example concepts and code examples.

Copy on Write(CoW):

  • 👉 Stores data using exclusively columnar file formats ( e.g. parquet).
  • 👉 Updates simple version and rewrites the files by performing a synchronous merge during write.
  • 🚀 COW tables are simpler to operate than MOR tables. All updates are written to base files in Apache Parquet format, eliminating the need for a separate compaction service to manage log files and improve efficiency. Updates are handled by entirely rewriting the file to generate a new version of the base file.
  • 👉 Consequently, COW tables exhibit higher write amplification because there is a synchronous merge that occurs to create the new base file version.
  • 🚀 However, a key advantage of COW tables is their zero read amplification because all data is available in the base file, ready to read. 🚀 The disk reads required for a query are minimal because they do not need to read multiple locations or merge data.

Let’s say we have 3 file groups with data files as below.

Now we know that in the case of CoW, every new batch of write to Hudi will result in creating a new copy of respective data files and a newer version will be created along with records from the incoming batch.

Let’s suppose we get a new batch of write and after indexing, we find that these have records matching file group1 and file group2 and there are new inserts too. So for the new INSERT, we will be creating a new file group ( file group4).

So, both data_file1 and data_file2 will have newer versions created. Data file 1 V2 is nothing but contents of data file 1 V1, merged with records from incoming batch matching records in data file 1.

If you have still not understood then let’s try to understand by taking some layman’s example.

Imagine you have a table that stores customer data. You want to be able to add new customers, update existing customer information, and delete customers. You also want to be able to run queries that return all of the customer data.

In the case of Copy-on-write (CoW), if there is an update in existing customer records, the CoW strategy doesn’t modify the original records. Instead, it creates a new copy of the entire dataset that incorporates the updates or changes. Here’s how it works:

  1. Initial State: You have an existing dataset with 10 customer records, let’s call them C1 to C10.
  2. Update: Let’s say you want to update the information for customer C5.
  3. Copy and Update: Instead of modifying the existing data, CoW creates a new copy of the files containing the C5 dataset, including the updated information for customer C5. Now, there will be a synchronous merge during write: Updates a simple version and rewrites the files by performing a synchronous merge during write. So, you now have a new set of files or a new version of your dataset that contains all 10 customer records (C1 to C10), including the changes you made to C5.
  4. Result: After the update, you effectively have two versions of your dataset. The original version with the unmodified C5 record and the new version with the updated C5 record.
  5. Metadata: Hudi also maintain efficient metadata where you can efficiently extract what record changed after each commit.

This approach ensures data immutability, as the original data remains unchanged. It also allows for time travel queries, where you can access data as it existed at different points in time by referring to the corresponding version of the dataset.

Let’s try to understand the above logic by writing some code and after that let’s try to understand the steps and pseudo code of UPSERTING data to the Apache Hudi Copy On Write(CoW) table.

I will suggest going through the code snippet once which is added below and then reading all the steps mentioned below for more understanding. Even if you don’t understand the code don’t get overwhelmed, try reading the steps, and you will be able to get it.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

# First Load all the required library and also Start Spark Session
# Load all the required library

# STEP1: Start Spark Session

spark = SparkSession.builder.appName("lakeHouse")\
.config('spark.jars.packages', 'org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.0')\
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')\
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')\
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
.config('spark.sql.catalog.local.warehouse','warehouse')\
.config("spark.driver.host", "localhost")\
.getOrCreate()

sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

print(f'The PySpark {spark.version} version is running...')


# STEP 2: READ THE CSV FILE AND CREATE THE DATAFRAME
# When ever new data wil come the CSV path will be replaced.
df = spark.read.option("header","true").\
csv("dataset/Hudi_Customer_input - Sheet1.csv")

refactored_df = df.withColumn(
"created",
to_date(col("created"),"yyyy-MM-dd")
).withColumn(
"updated",
to_date(col("updated"),"yyyy-MM-dd")
)

# STEP 3: SET the hudi_options, default writing strategy in Apache Hudi is Copy On Write(CoW)

tableName = "customers_detail_hudi_cow_table"

hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'created',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'updated',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}

# STEP 4: Write to Hudi table in UPSERT mode.

path = "/Users/ankur.ranjan/Desktop/customers_detail_hudi_cow_table"

df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)

# STEP 5:(Optional) Read the Hudi table

spark.read.format("hudi").load(path).select(
"id", "name", "address", "created", "updated"
).show()

Let’s try to understand the above code.

  1. Import the required library.
  2. Initiate the spark session: Don’t forget to add the required Apache Hudi jar. If you get stuck with some problem while running your code, try to check the Apache Hudi official documentation and always use the proper spark version and compatible Apache Hudi version otherwise it will be hard for you. But as you are a developer, I assume that you will be good with maintaining the dependency issue.
  3. Read your input files: For me, it is a simple CSV file, I have used Spark to create the Spark DataFrame using a CSV input file. Whenever new data comes the CSV path will be updated for reading new incoming data.
  4. Define the Hudi options: Here are a few things that you need to understand for applying to UPSERT. In hoodie.datasource.write.operation key we have defined UPSERT mode and by default, Hudi writes in the Copy on Write table. So we don’t need to specify the Copy-On-Write option in any key.
  5. The two attributes which identify a record in Hudi are the record key (see: hoodie.datasource.write.recordkey.field) and partition path (see: hoodie.datasource.write.partitionpath.field). In our configuration, the id is defined as a record key and created plays the role of a partition path. The combination of the record key and partition path is called a hoodie key.
  6. If the input batch contains two or more records with the same hoodie key, these are considered the same record. For example, let’s suppose in older data we have data something like this.

& after a few minutes, we received new sets of updated data

Here if you observed clearly then there is an update made to ID number 314, 310 i.e. Sadashiv & Matt and one brand new record with ID 324 and name Mariam has come in new input data.

  1. Sadashiv and Matt’s address column was changed to Spain & and Germany from India & USA respectively.
  2. The updated column is also updated.
  3. For us, our hoodie key is (record key + partition path).
  4. Here we have taken the ID column as the record key & and created a column as the partition path.
  5. Now, let’s try to understand another term in Apache Hudi which will help us to make UPSERT & i.e. pre-combining. Here in this example, I will consider the Updated column as my pre-combining key.
  6. Here if you observed it clearly then our hoodie key(id + created) for all the records are same as older records. So for applying the UPSERT(update + insert), Hudi runs the deduplication step called pre-combining.
  7. The pre-combining procedure picks the record with a greater value in the defined field. In our case, this field is the updated, column so updated=2023–09–11 i.e. 11th Sept picked over updated=2023–09–11 i.e. 10th Sept for changes/updates made for ID number 314(Sadashiv) & 310(Matt) from older data
  8. see:hoodie.datasource.write.precombine.field option in the code. Here we have defined the Updated column as a pre-combine key.

So after the UPSERT is made you will get the final output as

As we have understood applying UPSERT using Apache Hudi & created a Copy-On-Write table using it. Try to see the CoW hudi table structure.

  • You will see that after running the code, it will create one folder in a given path. I have given path = “/Users/ankur.ranjan/Desktop/customers_detail_hudi_cow_table”
  • Now let’s try to check the content of the hudi table.
  1. You can observe that it has created a folder in the given path with the name customers_detail_hudi_cow_table. Here every information of our Hudi Copy-On-Write will reside.
  2. If you observe the above image clearly then you will able to see that it has created two folders inside the main folder. These two folders are 2023–08–10 & 2023–09–10.
  3. If you have seen the data that we have passed to the code and our code then you might have observed that we have defined our partition-path as created column in the code. We had only two values of the created column in the input data and those were 2023–08–10 & and 2023–09–10.
  4. Now if you observe the given screenshot image again, then you will see that in the folder we have two parquet files and some metadata files. Observe the creation time of two parquet files. One is 23rd Sept 23:27 whereas the other is 23rd Sept 23:34.
  5. It is because we received our first input data at 23:27 and the next set of data at 23:34 & and as we have learnt in Copy-On-Write whenever new sets of data come then it will create one copy of the existing file and merge the latest change there. It will not change the existing Parquet file(Data/Base file).

Let’s try to verify this by reading those Parquet files individually. Please observe the given input in the above screenshots before observing the below code.

Note down that we are only reading for the partition folder=2023–08–10.

Let’s read the first Parquet file which was created when the first set of input data came i.e. on 23rd Sept 23:27

oldData =spark.read.format("parquet").load("/Users/ankur.ranjan/Desktop/customers_detail_hudi_cow_table/2023-08-10/28187041-60f6-4a77-af3c-f4fb43768938-0_0-560-579_20230923232714083.parquet")

oldData.select("id", "name", "address", "created", "updated").show()

Output is

+---+--------+-------+----------+----------+
| id| name|address| created| updated|
+---+--------+-------+----------+----------+
|322| Mulan| Japan|2023-08-10|2023-09-10|
|314|Sadashiv| India|2023-08-10|2023-09-10|
+---+--------+-------+----------+----------+

If you have observed the image of our first input data above then you might have seen that the created date of Mulan and Sadashiv was 2023–08–10 whereas for Ankur and Matt, the created date was 2023–09–10. So that’s why you are only seeing the two records in the 2023–08–10 folder for this Parquet file.

Now let’s try to read when the next set of data has come i.e. on 23rd Sept 23:34.

newData = spark.read.format("parquet").load("/Users/ankur.ranjan/Desktop/customers_detail_hudi_cow_table/2023-08-10/54a52289-bbe2-4e85-a7fa-89df4a62ecb8-0_0-506-527_20230923225057099.parquet")

newData.select("id", "name", "address", "created", "updated").show()

Output is

+---+--------+-------+----------+----------+
| id| name|address| created| updated|
+---+--------+-------+----------+----------+
|322| Mulan| Japan|2023-08-10|2023-09-10|
|314|Sadashiv| Spain|2023-08-10|2023-09-11|
|324| Mariam| UAE|2023-08-10|2023-09-11|
+---+--------+-------+----------+----------+

If you have observed the output clearly then you might have realised the magic of LakeHouse architecture particularly the Apache Hudi Copy On Write table.

I hope that all these above illustrations, images, code, steps etc. have helped you to understand the Copy-On-Write table in Apache Hudi.

Let’s meet in the next article to learn some more wonderful concepts of Apache Hudi till then Thank you, Goodbye & and keep learning.

We have discussed a lot of concepts in this article. It is becoming long. I hope some of my readers who do understand the LakeHouse architecture and worked on Apache Hudi will post their comment section about their reading.

Feel free to subscribe to my YouTube channel i.e. The Big Data Show. I might upload a more detailed discussion of the above concepts in the coming days.

More so, thank you for that most precious gift to a me as writer i.e. your time.

#apacheHudi #lakehouse #datalakehouse

Originally published at https://www.linkedin.com.

--

--

Ankur Ranjan

Data Engineer III @Walmart | Contributor of The Big Data Show