Walmart’s Cassandra CDC Solution

A look at Change Data Capture with Cassandra by Scott Harvester and Nitin Chhabra.

Data Acquisition

“Set my data free” could be the slogan for Walmart as it builds out its modern data platform. One of the goals of the platform is to provide integrated, performant, and automated Change Data Capture (CDC) solutions that extract data from transactional databases and deliver that data to the enterprise data lake. The solution is breaking down data silos database by database. For example, the implementation for Google Spanner reduced the time to enable a data pipeline from days to a couple hours of work.

The team at Walmart recently tackled capturing data changes from Cassandra. In this article, we will look at the challenges, options, and approaches for Cassandra CDC. Specifically, we will focus on Cassandra 4.x.

The Challenges

Even though Cassandra 4.x offers significant CDC improvements over Cassandra 3.x, Cassandra CDC is still in its infancy compared to other database technologies. There are several challenges that need to be addressed. These include:

Duplicate Data — Individual changes (CRUD operation) generate multiple CDC records, as data is replicated across Cassandra nodes. This means downstream processing will need to support more data changes than are generated on the source system. For example, if Cassandra is deployed across 3 regions (3 rings — 1 in each region) with a replication factor of 3 (a typical deployment at Walmart), each change will be replicated 3 times in each of the 3 region rings. That means one change will generate 9 CDC log records. We will discuss how we handled duplicates in the “Our Approach” section.

Change Columns Only — One limitation of Cassandra CDC is that the CDC logs only contain the changed columns. If your system needs to deliver the whole database object, you will need to develop a way to populate that object. Fortunately, our data pipeline can handle just the change columns and hydrate the full object.

Out of Order — The order of changes is not guaranteed with Cassandra CDC. The downstream systems will need to support receiving out of order data changes. Again, our downstream system already supported this use case.

Run Local — CDC is exposed by Cassandra via the CDC commit logs. To read the commit log, it is best to run an agent locally. This means you will need to manage running a CDC log reader on each node of your Cassandra deployment. In addition, you need to make sure the CDC log reader is actively removing the CDC commits logs because if the local file system fills up with CDC commits logs it will take down that Cassandra node.

Missing Changes — Be aware that not all data changes are not captured. Neither TTL, range deletes, static columns, triggers, materialized views, secondary indices, nor light-weight transactions will be captured in the CDC commit log. Since we do not use these features, we do not have to worry about this.

The Options

When we first started looking at Cassandra 4.x CDC there were only a couple of options — Cassandra Commit Log Reader and DataStax Change Agent for Apache Cassandra. The Cassandra Commit Log Reader would require us to implement an agent for the reader and create handlers for the data changes. The DataStax solution was further along but coupled to Pulsar instead of Kafka, which Walmart is heavily invested in.

Fortunately, we noticed a pull request to add the Cassandra 4.x Commit Log Reader to Debezium. Debezium is one of the most popular open source CDC Platforms. The pull request was merged into the Debezium 1.9 release.

This was a perfect solution for us as we have been actively using Debezium as a CDC framework for other database technologies to publish changes to Kafka. This gave us the ability to capture changes from Cassandra and publish those changes to Kafka, but there were still a few hurdles we had to jump as described in next section.

Our Approach

Debezium provided a mechanism to read the CDC commit logs generated by Cassandra and publish those changes to Kafka, but we still had 2 hurdles: near real-time processing and duplicate data changes.

Near-Real Time Processing

Diagram 1 gives an overview of how we run Debezium on Cassandra nodes. Debezium monitors the Cassandra CDC Commit Log directory for changes. Specifically, the Cassandra 4.X Debezium connector waits for an .idx file to be marked completed before processing the actual commit log file. The .idx file has a reference to raw log file in its name. For example, in the below diagram the .idx file CommitLog-7–1647924600284_cdc.idx tracks the state of the CDC Commit Log file CommitLog-7–1647924600284.log.

Diagram 1: Debezium on Cassandra Node

We made an enhancement to the existing Debezium Cassandra approach by modifying the code to continuously process the commit logs as new data was available, reference: Cassandra4 CDC. This means the process does not wait for the CDC commit log to be marked as completed before sending the CDC messages to Kafka.

In the future, we plan to modify the implementation to process the files in parallel. This will help the CDC processing to keep up with Cassandra when dealing with high data volumes or during catchup scenarios.

Duplicate Data Change

The key requirements for deduplication of data were scale and state retention. Scale because we could receive 60,000 data changes a second. State retention because we do not want to send data we already processed even if our process lost state because of crash.

Flink seemed like a logical solution, but we decided on a simpler approach. Our internal Data Acquisition Tool (named DAQ for short) already supported process scaling through data partitioning. This ability has enabled DAQ to scale to capture data changes for Walmart’s most critical and high-volume e-commerce databases. DAQ also supported custom plug-ins. The question was, could we leverage the DAQ architecture and use a plug-in to handle the deduplication?

The answer was yes. By maintaining state in a cache, we were able to add de-duplication logic to DAQ. The cache we used to maintain state was Redis. We will go into the advantages of Redis later.

Diagram 2 illustrates the data flow of changes starting from Cassandra, through DAQ for deduplication, and sending the data to Kafka.

Diagram 2: Cassandra to Kafka Data Flow

Redis helped us handle the scale using RedisBloom, an implementation of a Bloom filter. A Bloom filter is a memory structure that can determine if an element is a member of a set. The big advantage of a Bloom filter is that the memory size is incredibly small. The other big advantage of using a Bloom filter with Redis is that we could make bulk calls to check if a data change has already been seen. This reduced the network chattiness between our process and Redis.

There are disadvantages with this approach. First, it is not possible to delete data from a Bloom filter. To get around deleting data, we partition the Bloom filter by transaction time to the hour. We use Redis’ TTL (Time To Live) setting to automatically remove old Bloom filters. This means if we keep 3 days of Bloom filters, we will have 72 bloom filters (3*24). But, with the footprint of Bloom filters being so small, we did not see any negatives in partitioning the Bloom filters by hour (for instance, depending on the configuration, we were seeing 72 Bloom filters take up around 5 GB of memory in Redis). In fact, it helps reduce the number of false positives we would have seen compared to using the Bloom filter over a longer period.

That brings us to the other disadvantage — the error rate. The Bloom filter is a probabilistic data structure. This means the bloom filter can think it has seen a data change that has not been seen before. In our case, by using a Bloom filter, our process might determine it has seen a data change before that has never been processed before. To mitigate the error rate, we rotate the Bloom filter by hour, as described above, and we set a very high error rate (for example: 1 in 1,000,000). In our testing, we can process large sets of data without missing any data. But because there is still the potential for missing data, we are careful to make sure the use cases we use with the Bloom filter can accept some missed data. If the use case cannot accept any data loss, we recommend a least-recently used (LRU) cache implementation in place of the bloom filter.

Performance Results

Debezium Cassandra4 CDC Performance

We profiled Debezium Cassandra4 Connector by running different loads and traffic patterns using the cassandra-stress-tool and measured the performance and resource utilization on Cassandra nodes. The cassandra-stress tool is run only on one of Cassandra Nodes to generate higher workloads. All clusters are provisioned in Azure. Here is test environment setup:

CDC Kafka: 24 partitions

Cassandra Nodes: 3 Nodes, Replication factor: 3, Each Node has 8 Cores/28G of RAM

Results are obtained from one of the nodes where changes/CDC data are received from Cassandra Replication. Here are the results:

Where:

1. Debezium Processing Rate is throughput at which Debezium is processing the messages.

2. Cassandra CPU Usage: Debezium is stopped to measure Max Cassandra CPU Usage.

3. Debezium CPU Usage: Difference between Total CPU Usage at #4 and Cassandra CPU usage

4. Total CPU Usage is the Max CPU Utilization when Cassandra Replication and Debezium is running/processing the messages.

The JVM heap memory usage of Debezium was less than 1.75G with 300K/min rate.

Deduplication Performance Results

To demonstrate the performance of deduplication solution, we will look at 3 experiments. Experiment 1 varies the batch size for the deduplication processing. Experiment 2 examines how changing the error impacts performance and deduplication results. Experiment 3 tests scaling the deduplication process.

For these tests, all clusters are provisioned in Azure. For simplicity, and to stress test the Redis based deduplication (1.5Million Deduplicated Records/minute), we disabled writing to the deduplicated Kafka (refer Diagram 2) for the first two experiments. Here is how the test environment was set up:

CDC Kafka: 15 partitions

DAQ Deduplication VM Configuration: 2 nodes, 8 core VM’s, 24GB of RAM

Redis Instance: 6 Node Cluster (3 Master + 3 Replicas) provisioned with 8 core/16GB per node.

Experiment 1: Different CDC Kafka batch size with Fixed Error Rate

For this test we configured the Bloom Filter Error Rate = 0.00001 and Bloom Filter Capacity = 10Million. The following table shows the results:

Where:

· Rate is the throughput of the deduplication process

· Avg Deduplication Time is the average time to pass a Batch through the deduplication process

· Max CPU on the DAQ VM is the max CPU usage observed during the performance test

We had configured the false positives to be 1 in 100K (0.00001), in the above test runs and had 0 false positives in the above 3 scenarios.

Experiment 2: Different Error Rates with Fixed CDC Kafka Batch Size

For this test we configured the Bloom Filter Capacity = 10Million and the Kafka batch Size=3.5K

We did see false positives when we had set a low error rate (1 in 10). Also, note that there is a slight degradation in deduplication processing time as we decreased the error rate for better accuracy.

Experiment 3: Scaling the Deduplication Process

In this experiment, we enabled a 24 partition Deduplicated Kafka and increased the number of partitions in CDC Kafka to 24. We ran the deduplication app on 3 Azure VMs with Bloom Filter error rate: 1 in 1 million and capacity = 20 million. The following table shows the result:

Processing Time per batch: 2 sec

Rate: 623K/min

Where:

· Processing Time per batch: The total processing time (Read from CDC Kafka + Deduplication + Load to deduplicated Kafka) at the 95th percentile averaged across all 3 DAQ nodes

· Rate is the throughput of the process.

The numbers represent extracting the Debezium records from Kafka, deduplicating it via Redis based bloom filter and loading the deduplicated records to Kafka. This solution scales linearly by adding partitions to Kafka and adding DAQ VMs.

Bloom Filter Memory Usage

In our experiments, with error rate of 1 in Million (0.000001), Bloom Filter Capacity of 20Million and rotating bloom filter every hour, the memory used per bloom filter is ~67MB which is in same ballpark as reported by Bloom Filter Calculator. 72 bloom filters with above configuration will occupy 5.2GB of memory in distributed Redis nodes, which is very minimal.

Conclusion

Cassandra CDC has been an interesting challenge. We started the project without a good solution for extracting changes from Cassandra and a very complex design for removing duplicate data changes. In the spirit of Occam’s Razor, we whittled the requirements and design to the simplest solution. The result is a scalable solution that we can keep enhancing for new use cases.

Going forward we are contributing back to the Debezium project by adding support for reading incremental changes from the commit log files, planning to add a LRU Cache option for deduplication, and continuing to optimize the process.

Walmart’s Cassandra CDC Solution was originally published in Walmart Global Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Article Link: Walmart’s Cassandra CDC Solution. A look at Change Data Capture with… | by Scott Harvester | Walmart Global Tech Blog | Jul, 2022 | Medium