ACID transformations on Distributed file system

ACID transformations on Distributed file system using Apache Hudi

Apache Hudi

Source for above image : https://hudi.apache.org/

Co-author : Saurabh Pateriya

As is challenge :

In Big data world performing ACID transformations on distributed file system is always challenging and proprietary databases are generally preferred to perform ACID transformations ,which usually charge for both storage as well as compute

ACID is an acronym for atomicity, consistency, isolation, and durability. Each of these four qualities contribute to the ability of a transaction to ensure data integrity

Introduction to Apache Hudi:

Apache Hudi is open source file format which provides ACID transformations and Hudi means Hadoop for upserts, deletes and incrementals.To reduce cost, we can store data in distributed file systems and create Hudi tables on top of it. So we pay only for storage

Hudi stores data in parquet format and transactional logs in .hoodie folder. For each commit, a delta log is created and logs encode data in Avro format for speedier logging

Hudi is designed around the notion of base file and delta log files that store updates/deltas to a given base file (called a file slice). Their formats are pluggable, with Parquet (columnar access) and HFile (indexed access) being the supported base file formats today. The delta logs encode data in Avro (row oriented) format for speedier logging (just like Kafka topics for example). Hudi’s unique file layout scheme encodes all changes to a given base file, as a sequence of blocks (data blocks, delete blocks, rollback blocks) that are merged in order to derive newer base files. In essence, this makes up a self contained redo log that the lets us implement interesting features on top.

Source for above image : https://hudi.apache.org/blog/2021/07/21/streaming-data-lake-platform/

There are two different types of tables available in Hudi as mentioned below

  1. Copy on Write (CoW): Data is stored in columnar format (Parquet) and updates create a new version of the files during writes. This storage type is best used for read-heavy workloads because the latest version of the dataset is always available in efficient columnar files
  2. Merge on Read (MoR): Data is stored with a combination of columnar (Parquet) and row-based (Avro) formats; updates are logged to row-based “delta files” and compacted later creating a new version of the columnar files. This storage type is best used for write-heavy workloads because new commits are written quickly as delta files, but reading the data set requires merging the compacted columnar files with the delta files.

Copy On Write Table Supports two kind of queries.

Snapshot Queries :
Query the latest snapshot of table data until the last successful commit.

Incremental Queries :
Query the incremental changes made to the table. These queries can be help full to downstream jobs to process the changed data.
Ex: Get the data committed to table between timeframe t1 and timeframe t2. (Updates + Inserts)

From Hudi 0.9 version it is compatible with Spark SQL , so that we can write Native SQL to perform ACID transformations like Delete, update and merge commands

Let’s deep dive into code.

Environment setup
Spark Version : 2.4
Apache Hudi Version : 0.10
File system : HDFS/GCS etc ..

Below code will launch a spark shell with all Hudi packages and create a Hudi table and then perform ACID transformations based on logic

spark-shell \
--class org.apache.hudi.utilities.HoodieClusteringJob \
- packages org.apache.hudi:hudi-spark-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:2.4.4 \
- conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
spark.sql(“create table if not exists Hudi_Table ( col1 string, col2 string, col3 string,col4 string,col5 string,col6 int,col7 date,col8 string) using hudi options ( type = ‘cow’, primaryKey = ‘col1’ ) partitioned by (col7,col8)”)
spark.sql(“merge into Hudi_Table X using ( select col1,col2,col3,col4,col5,col6,col7,col8 from stage_Table ) source on X.col1 = source.col1 WHEN MATCHED THEN update set X.col2 = source.col2 when NOT matched then INSERT * ")

By default Hudi table creates below mentioned columns along with the columns we specify in our logic

  • _hoodie_commit_time : Last commit that touched this record. Based on this column latest version of record will be retrieved
  • _hoodie_record_key : Treated as a primary key within each DFS partition, basis of all updates/inserts
  • _hoodie_file_name : Actual file name containing the record (super useful to triage duplicates)
  • _hoodie_partition_path : Path from basePath that identifies the partition containing this record

It took around 9 mins to load more than 2 GB of data into HUDI table which has upsert data

Performance tuning:

1. Clustering :

Trade-off between file size and ingestion speed is provided by hoodie.parquet.small.file.limit property to a configure file size. There are two types of clustering scheduling and execute clustering.

Below is example for clustering

source for above image :

https://hudi.apache.org/blog/2021/01/27/hudi-clustering-intro/

2. Bulk insert:

Hudi supports bulk_insert as the type of write operation by setting below two configs:

set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

3.Indexing

Hudi provides best indexing performance when you model the recordKey to be monotonically increasing (e.g timestamp prefix), leading to range pruning filtering out a lot of files for comparison. Even for UUID based keys, there are known techniques to achieve this. For e.g , with 100M timestamp prefixed keys (5% updates, 95% inserts) on a event table with 80B keys/3 partitions/11416 files/10TB data, Hudi index achieves a ~7X (2880 secs vs 440 secs) speed up over vanilla spark join.

Limitations :

  1. Currently Apache Hudi 0.10 provides functionality to update records only based on primary key columns. Updates based on non primary columns or primary key columns are not supported
  2. If a composite primary key is defined then all the columns needs to be specified in ON condition
  3. Datatypes like varchar, timestamps are not supported on Hudi tables

Probable solutions :

  1. Delta : It is another file format which provides ACID transformation functionality on top of distributed file systems. Here we can update records based on non-primary key columns as well.
  2. Iceberg : It is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark , PrestoDB, Hive etc.. and it also provides functionality of updating records based on non primary keys

References :

https://hudi.apache.org/blog/2021/07/21/streaming-data-lake-platform/
https://hudi.apache.org/docs/quick-start-guide
https://github.com/apache/hudi/tree/master/hudi-examples
https://cwiki.apache.org/confluence/display/HUDI/Tuning+Guide

ACID transformations on Distributed file system was originally published in Walmart Global Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Article Link: ACID transformations on Distributed file system | by Rajasekhar | Walmart Global Tech Blog | Medium