Coauthored by Guru Prakash and Chirag Goel
Picture depicting the flow of data and its transformationOver 2000 data pipelines are used to manage our domain’s petabyte-scale data lake for corporate and managerial reporting, along with many enterprise use cases. With the exponential growth in 6Vs of data and an ever-changing tech landscape, we must frequently update and upgrade our pipelines without impacting our SLAs. This blog explores our approach to building data pipelines which may be leveraged as a template.
How it came about: Our use case
Our requirement was to centralize, curate and certify data across specific markets for corporate reporting with necessary governance in place. We analyzed the data landscape across five markets and discovered the shared needs and the fine nuances of each market. An abstraction layer was built to handle the differences of each, which enabled the data pipelines to be generic. The generic process was handled by a unified code base with internationalization. The framework was designed to pick up market-specific configurations (localization) during the runtime to process the data.
Inbound abstraction and generalization with unified codeHigh-level design: The framework
The framework takes in two sets of configurations: the pipeline configuration and the flow configurations (task-level configurations). The “shapes” concept is also used extensively throughout the framework.
Introduction to shapes
The flow configuration contains a collection of shapes that represents a logical unit of flow in a pipeline. Shapes can extract, process, and load data in a flow. Custom shapes can be created as required for the pipeline, enabling reusability across data processing. The following diagram illustrates the hierarchy of shapes as an abstraction of Spark dataset operations.
Shapes LibraryBelow is a generic extractor shape used to extract data from multiple sources. Likewise, the framework allows us to create our own shapes for our use cases to add as configurations. Shapes can be used with semantic numbering.
{
type = "generic-extractor", id = "sales-extract",
schema-name = "${sales_schema}", table-name = "${store_table}",
select-expr = ["item_id", "txn_dt", "sales_amt", "unit_qty"],
filter-cond = """ txn_dt=date'2023-07-11' """
},
1. Pipeline configuration
Configuration driven frameworkPipeline configuration consists of the scheduler, cluster, and flow (task) configurations. Framework reads the configuration, schedules the pipeline in scheduler, provisions a cluster, creates a DAG with task dependencies, and submits the job using the chosen runner. Below is an example of a sample configuration.
{
"dag_config": {
"dag_id": "dynamic_dag_cdf_top_10",
"schedule_interval": "@daily",
"start_date": "2023-10-25",
"default_args":{
"owner": "airflow"
},
"cluster_config": {
"project_id": "sample_project",
"cluster_name": "cluster_top10",
"region": "us-central1"
}
},
"tasks": {
"task":{
"task_id": "task_1",
"type": "cluster_create"
},
"task": {
"task_id": "task_2",
"type": "spark-submit",
"job_params": {
"spark_job": {
"jar_file_uris": ["gs://*****/config-driven-framework-1.0.0.jar"],
"file_uris": ["gs://****/pipeline_configs/examples/top10/top_10.conf"],
"main_class": "com.framework.core.PipelineRunner",
"args": ["top_10.conf"]
}
},
"upstream": ["task_1"]
},
"task":{
"task_id": "task_3",
"type": "cluster_delete",
"upstream": ["task_2"]
}
}
}
2. Flow configuration
Flow configuration being passed on to the frameworkFlow configuration consists of a set of tasks that is ran for a pipeline. Tasks are auto wired by the framework at runtime. The output of each shape inside the tasks become available to all other shapes in the pipeline. In the below example, the “sales-extract” shape is directly used like a table in the generic join shape. This keeps the configuration flat and human-readable and enables trouble shooting.
name = "top-10-selling-items"
loadtype = "batch"
sparkconf = {
}
defaultargs = {
txn_dt = "2023-07-11"
}
flows = [
{
name = "top-10-selling-items",
shapes = [
{
type = "generic-extractor", id = "sales-extract",
schema-name = "${sales_schema}", table-name = "${store_table}",
select-expr = ["item_id", "txn_dt", "sales_amt", "unit_qty"],
filter-cond = """ txn_dt=date'2023-07-11' """
},
{
type = "generic-extractor", id = "item-extract",
schema-name = "${item_schema}", table-name = "item",
select-expr = ["item_id", "item_desc", "dept_desc"]
},
{
type = "generic-join", id = "combine-sales-item",
left-source = {type = "left-join-predicate", id = "sales-extract", alias = "s"},
right-list = [{
type = "right-join-predicate",
id = "item-extract", alias = "i",
join-type = {type = "leftouter"},
join-expr = """s.item_id=i.item_id """
}],
select-expr = ["s.txn_dt", "s.item_id", "i.item_desc", "s.sales_amt", "unit_qty", "i.dept_desc"]
},
{
type = "generic-aggeregator", id = "sales-unit-agg", source-id = "combine-sales-item",
operation = {type = "group-by"},
grp-cols = ["txn_dt", "item_id", "item_desc", "dept_desc"],
agg-expr = {
"sales_amt" = ["sum", "tot_sales"],
"unit_qty" = ["sum", "unit_qty"]
},
select-expr = ["txn_dt", "item_id", "item_desc", "dept_desc", "tot_sales", "unit_qty"]
},
{
type = "generic-arrange", id = "orderby-totalsales", source-id = "sales-unit-agg", operation = {type = "order-by"}, value = "tot_sales desc"
},
{
type = "logger", id = "orderby-totalsales", value = "10"
}
]
}
]
Framework features
We’ll explore each of the below functionalities and features that enabled our data pipeline development to help you do the same.
- Inbound abstraction
- Variable substitution
- Argument precedence
- Conditional execution
- Configuration reusability (inheritance)
- Transaction support/stream processing
- Unit test Automation
- Pipeline lineage
- Platform independency
- Developer productivity
- Recommendation engine
- Unified code structure
1.Inbound abstraction
Data is ingested from multiple sources. Oftentimes, upstream sources will migrate to new systems or incorporate compliance changes in the data sources which can result in changes to how we connect to them, requiring different sets of connectors. An inbound abstraction feature enables the users to choose the connectors required for their use cases while its implementations are abstracted. If a connector change is needed, there’s no need to change how connectors are added in the configuration.
2.Variable Substitution
Once the pipeline is scheduled, variable values may need to be passed. We provide three levels of substitution to do so:
1. Inside its specific pipeline
2. A common environment file containing all the variables which can be leveraged between higher and lower environments
3. The ability to define them dynamically at runtime when submitting the job
name = "top-10-selling-items"
loadtype = "batch"
sparkconf = {
}
defaultargs = {
txn_dt = "2023-07-11"
schema = "sales_schema"
table = "store_table"
}
flows = [
{
name = "top-10-selling-items",
shapes = [
{
type = "generic-extractor", id = "sales-extract",
schema-name = "${schema}", table-name = "${table}",
select-expr = ["item_id", "txn_dt", "sales_amt", "unit_qty"],
filter-cond = """ txn_dt=date ${txn_dt} """
}
]
}
]
3.Argument precedence
There are three framework arguments: key-value pairs, environment configurations, and pipeline configurations. The scheduler-passed key-value arguments take precedence over the environment configurations, and the environment configurations takes precedence over the configuration file arguments.
4.Conditional execution
The framework provides flexibility to execute specific shapes within a flow group based on its conditions, enabling conditional logic within the pipeline. The value of the condition at runtime denote the execution of a group’s specific shapes while the remaining are left untouched during execution.
5.Configuration reusability (inheritance)
Inheritance is provided out of the box in the event the users request common logic as a separate configuration. This provides better code reusability and smaller configuration files to target only the code for that particular flow. Below is an example of configurations being used in another.
includes = ["src/test/resources/base/inheritance/parent.conf"]
name = "parent"
loadtype = "batch"
flows = [
{
shapes = [{
type = "generic-join", id = "join", group = "true",
left-df = {type = "left-join-predicate", id = "A", alias = "s"},
right-list = [{
type = "right-join-predicate",
id = "B", alias = "i",
join-type = {type = "leftouter"},
join-expr = """s.item_id=i.item_id """
}],
} ]
6.Transaction support/stream processing
Transaction support has been enabled using Hudi. Spark structured streaming is utilized for near-real-time scenarios, while Spark Rapids is employed for GPU-bound use cases.
7.Unit test automation
The framework not only automates test data but also allows developers to write concise unit test cases to deliver quality code at speed. It can validate data between actual and expected results, either by the user providing specific details or by automatically running a comprehensive list of tests.
8.Pipeline lineage
The framework provides support for both static and dynamic data lineage tracking. Static lineage is provided after parsing the configuration. It helps the user better understand the code logic and can be used to document and track the core logic in the pipeline. Dynamic lineage tracking is done via Spline which is integrated with this framework and comes out of the box. Below is an example of log showing static pipeline lineage.
_______________________________________________________________________________
Logical plan
for the pipeline
--------------------------------------------------------------------------------
flows = List(
List(
(Extract(List(sales_schema.item_sales)) -> sales - extract, 1),
(Extract(List(item_schema.item_table)) -> item - extract, 1),
(Join(sales - extract
with List(-> item -extract)
) ) -> combine -sales - item
, 2
),
(Aggregate(combine - sales - item, groupBy(group by List(txn_dt, item_id, item_desc,dept_desc)) -> sales - unit - agg, 3),
(Arrange(sales - unit - agg, orderBy, (tot_sales desc)) -> orderby - totalsales, 4),
(Logger(orderby - totalsales, None, 10), 5))
)
--------------------------------------------------------------------------------
SQL Equivalent
--------------------------------------------------------------------------------
scalaEquivalent = List(
val sales - extract = spark.sql(
"""select item_id,txn_dt,sales_amt,unit_qty
from sales_schema.item_sales
where txn_dt=date'2023-07-11'
""")
sales - extract.createOrReplaceTempView("sales-extract")
9.Platform independency
Ensuring the configurations are independent of a particular language allows developers to use them in any environment. By choosing a specific runner when submitting the job, specific tasks can be completed (i.e., migrating platforms, applying patches, integrating with other tools, switching from Java to Python, etc.).
10.Developer productivity
To help developers in code development and troubleshooting, we have enabled notebooks within the framework. Developers can execute and test individual shapes, view the lineage of the entire pipeline, and even access equivalent SQL queries. Below is an example of code snippet from interactive notebook.
import com.walmart.finwb.interactive._
val cdf = CdfSession.create()
cdf.args.set ("schema", "sales_schema" )
cdf.args.set("table", "sales_table")
cdf.args.set("txn_dt","2024-06-16")
// Create any Shape from given Config
val extractShape = cdf. createShape ("'
{
type = "generic-extractor", id = "sales-extract",
schema-name = "${schema}", table-name = "${table}",
//execute the shape
extractShape.execute().show (10) I/get the summary
cdf.summary()
11.Recommendation engine
Pipeline libraries are included to identify preexisting configurations and any correlations among them. It recommends moving common configurations to a shared space or creating custom shapes. Additionally, the framework flags deprecated tags and ensures backward compatibility.
12.Unified Code Structure
The codebase is organized into data and configuration folders. The data folder contains DDL statements, task-level configurations (with internationalization enabled), and reusable configurations for markets. The configuration folder includes app, cluster, and scheduler configurations, with app configurations acting as the main for specific markets.
- main/
- data/
- data_ingestion/
- upstream
- landing_zone
- market1/
- schema
- table1
- ddl.yaml
- table2
- table3
- market2/
- schema
- table1
- common/
- default.yaml
- data_transformation/
- market1/
- schema
- table1
- task_1.conf
- task_2.conf
- table2
- table3
- market2/
- schema
- table1
- task_1.conf
- task_2.conf
- table2
- table3
- common/
- forex.conf
- item.conf
- config/
- app/
- market1/
- dev
- market1_inc_config.conf
- market1_hist_config.conf
- qa
- market1_inc_config.conf
- market1_hist_config.conf
- prod
- market1_inc_config.conf
- market1_hist_config.conf
- market2/
- common/
- default.conf
- cluster/
- market1/
- prod
- cluster_template1.json
- cluster_template2.json
- market2/
- common/
- default_template.json
- scheduler/
- market1/
- hist
- dag_config_1.json
- dag_config_2.json
- inc
- dag_config_3.json
- dag_config_4.json
- market2/
- common/
- dev
- dynamic_dag.py
- qa
- dynamic_dag.py
- prod
- dynamic_dag.py
- tests/
- scripts/
- changelog.md
- README.md
Framework Reference Diagram
Framework reference diagramConclusion
Generating business value and ensuring feasibility in its construction were the main priorities when designing the framework.
· Feedback from developers was gathered and incorporated through multiple iterations, resulting in a seamless adoption and a 25% increase in developer productivity.
· The unified code approach accelerated the creation of new market spinoffs, allowing them to be managed with fewer resources.
· Data processing was streamlined to ensure timely and accurate insights.
· Automated pipeline deployment ensured quicker application of platform and library upgrades, while points of intersection in the pipelines were reduced, and process automation was added, reducing total cost of operations.
This configuration-driven data pipeline framework is currently in production with ~300 pipelines deployed to date. Not only can it provide financial benefits via cost savings and efficiency increases, it can help to enable future growth and innovation.
Achieve million-dollar savings with unified code and configuration-driven data pipelines was originally published in Walmart Global Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.