CDC Cache - Change Data Capture¶
CDC (Change Data Capture) is a pattern used to track and process only the changes (inserts, updates, deletes) in your data, rather than reprocessing the entire dataset every time.
The Cache class in Flypipe supports optional CDC functionality through unified read() and write() methods with optional CDC parameters. This enables incremental processing where only new or changed data flows through your pipeline.
Key Concepts¶
- Unified Cache Interface: CDC is built into the base
Cacheclass through optional parameters create_cdc_table(): Creates the CDC metadata table (optional, for caches with CDC support)read()with CDC: Accepts optionalfrom_nodeandto_nodeparameters for filteringwrite()with CDC: Accepts optionalupstream_nodes,to_node, anddatetime_started_transformationparameterscdc_datetime_updated: Timestamp column tracking when each row was last processed- Incremental Processing: Process only new/changed rows and append to existing results
- Static Nodes: Mark nodes with
.static()to skip CDC filtering and always load complete cached data - Parallel Execution: Runner executes independent nodes in parallel using ThreadPoolExecutor
When to Use CDC Cache¶
- Large datasets: When reprocessing all data is expensive
- Incremental loads: When source data arrives in batches over time
- Data warehousing: Building fact tables with regular updates
- Real-time pipelines: Processing streaming data in micro-batches
Implementing CDC Cache¶
To implement CDC cache, you need to:
- Create a
CDCManagerclass to track CDC metadata (timestamps, source/destination nodes) - Create a cache class that extends
Cacheand implements:- Standard cache methods:
read(),write(),exists() - Optionally override
create_cdc_table()for CDC support - The
read()method should handle optionalfrom_nodeandto_nodeparameters for CDC filtering - The
write()method should handle optionalupstream_nodes,to_node, anddatetime_started_transformationparameters for CDC metadata - The
CDCManageris created internally within the cache class - Specify
merge_keysfor Delta Lake MERGE INTO operations
- Standard cache methods:
- Use
CacheMode.MERGEto trigger upsert operations on cached data - The Runner handles parallel execution and calls
create_cdc_table()before executing
Step 1: CDC Manager with Spark Table¶
The CDCManager tracks when each edge (source → destination) in the pipeline was last processed.
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
class CDCManager:
"""Manager that tracks CDC metadata using a Spark table"""
def __init__(self, table="cdc_metadata", schema="default", catalog=None):
self.table = table
self.schema = schema
self.catalog = catalog
@property
def full_table_name(self):
"""Returns the fully qualified table name"""
if self.catalog:
return f"{self.catalog}.{self.schema}.{self.table}"
return f"{self.schema}.{self.table}"
def create_table(self, spark):
"""
Create CDC metadata table if it doesn't exist.
This is called by the Runner before parallel execution to avoid concurrent creation conflicts.
"""
if spark.catalog.tableExists(self.full_table_name):
return
cdc_schema = StructType([
StructField("source", StringType(), False),
StructField("destination", StringType(), False),
StructField("cdc_datetime_updated", TimestampType(), False)
])
# Create database if it doesn't exist
if not spark.catalog.databaseExists(self.schema):
spark.sql(f"CREATE DATABASE IF NOT EXISTS {self.schema}")
try:
# Create empty table
empty_df = spark.createDataFrame([], cdc_schema)
empty_df.write.format("delta").mode("append").partitionBy(
"source", "destination"
).saveAsTable(self.full_table_name)
print(f"Created CDC metadata table: {self.full_table_name}")
except Exception as e:
# If another process created it concurrently, ignore
if not spark.catalog.tableExists(self.full_table_name):
raise e
def write(self, spark, upstream_node, to_node, timestamp):
"""
Write CDC timestamp entry for upstream_node -> to_node.
Parameters
----------
spark : SparkSession
Spark session
upstream_node : Node
The upstream node that provided data
to_node : Node
Destination/target node being processed
timestamp : datetime
Timestamp when processing occurred
"""
from pyspark.sql.functions import lit
# Create entry for this edge
entry = (upstream_node.__name__, to_node.__name__, timestamp)
cdc_schema = StructType([
StructField("source", StringType(), False),
StructField("destination", StringType(), False),
StructField("cdc_datetime_updated", TimestampType(), False)
])
new_entry_df = spark.createDataFrame([entry], cdc_schema)
# Append to CDC metadata table
new_entry_df.write.format("delta").mode("append").partitionBy(
"source", "destination"
).saveAsTable(self.full_table_name, mergeSchema=True)
def filter(self, spark, from_node, to_node, df):
"""
Filter dataframe to return only rows with cdc_datetime_updated after last processed timestamp.
Parameters
----------
spark : SparkSession
Spark session
from_node : Node
Source node
to_node : Node
Destination node
df : DataFrame
DataFrame to filter
Returns
-------
DataFrame
Filtered dataframe with only new rows
"""
if not from_node or not to_node:
return df
# Get the last processed timestamp for this edge
edge_query = f"""
SELECT MAX(cdc_datetime_updated) as last_timestamp
FROM {self.full_table_name}
WHERE source = '{from_node.__name__}'
AND destination = '{to_node.__name__}'
"""
last_timestamp_df = spark.sql(edge_query)
last_timestamp = last_timestamp_df.collect()[0]["last_timestamp"]
if last_timestamp is None:
# No previous run, return all data
print(f" → No CDC history found for {from_node.__name__} → {to_node.__name__}, processing all data")
return df
# Filter dataframe to return only rows updated after last timestamp
if "cdc_datetime_updated" in df.columns:
filtered_df = df.filter(df["cdc_datetime_updated"] > last_timestamp)
row_count = filtered_df.count()
print(f" → CDC filter: {row_count} new rows since {last_timestamp}")
return filtered_df
# If no cdc_datetime_updated column, return all data
return df
Step 2: CDC Cache Implementation¶
This cache implementation uses Delta Lake tables for storage and integrates with the CDC manager for timestamp tracking. It uses Delta Lake's MERGE INTO for upsert operations based on merge keys.
from flypipe.cache import Cache
class IncrementalCDCCache(Cache):
"""Cache that supports incremental CDC processing with Spark tables"""
def __init__(self, table, merge_keys, schema="default", catalog=None, cdc_table="cdc_metadata"):
super().__init__()
self.table = table
self.merge_keys = merge_keys
self.schema = schema
self.catalog = catalog
self.cdc_table = cdc_table
self.cdc_manager = CDCManager(table=cdc_table, schema=schema, catalog=catalog)
@property
def full_table_name(self):
"""Returns the fully qualified table name"""
if self.catalog:
return f"{self.catalog}.{self.schema}.{self.table}"
return f"{self.schema}.{self.table}"
def read(self, spark, from_node=None, to_node=None, is_static=False):
"""
Read cached data from Delta table with optional CDC filtering.
Parameters
----------
from_node : Node, optional
Source node for CDC filtering
to_node : Node, optional
Destination node for CDC filtering
is_static : bool, optional
If True, skip CDC filtering and load complete cached data (default: False)
"""
df = spark.table(self.full_table_name)
# Apply CDC filtering if nodes are provided and is_static is False
if from_node is not None and to_node is not None and not is_static:
df = self.cdc_manager.filter(spark, from_node, to_node, df)
elif is_static:
row_count = df.count()
print(f" → Static node {from_node.__name__}, loaded {row_count} rows (no CDC filtering)")
return df
def write(self, spark, df, upstream_nodes=None, to_node=None, datetime_started_transformation=None):
"""
Write cache - merge into existing Delta table using MERGE INTO with optional CDC metadata.
Parameters
----------
df : DataFrame
DataFrame to cache
upstream_nodes : List[Node], optional
List of upstream cached nodes for CDC tracking
to_node : Node, optional
Destination node for CDC tracking
datetime_started_transformation : datetime, optional
Timestamp when transformation started for CDC tracking
"""
# Handle regular cache write
if df.isEmpty():
return
if not self.exists(spark):
# First write - create table
print(f" → Creating table {self.full_table_name}")
df.write.format("delta").mode("overwrite").saveAsTable(self.full_table_name)
else:
# Subsequent writes - merge into existing table
print(f" → Merging into table {self.full_table_name}")
df.createOrReplaceTempView("updates")
# Build merge condition based on merge keys
merge_condition = " AND ".join([f"target.{key} = source.{key}" for key in self.merge_keys])
merge_query = f"""
MERGE INTO {self.full_table_name} AS target
USING updates AS source
ON {merge_condition}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(merge_query)
# Handle CDC metadata write
if upstream_nodes and to_node and datetime_started_transformation:
for upstream_node in upstream_nodes:
self.cdc_manager.write(spark, upstream_node, to_node, datetime_started_transformation)
def exists(self, spark):
return spark.catalog.tableExists(self.table, self.schema)
def create_cdc_table(self, spark):
"""
Ensure CDC metadata table exists (for thread-safe parallel execution).
Called by the Runner before parallel execution to avoid concurrent creation conflicts.
"""
self.cdc_manager.create_table(spark)
Step 3: Define the Pipeline with CDC Cache¶
We'll create a simple pipeline:
t1 (input passthrough, no cache)
|
v
t2 (transformation with CDC cache)
|
v
t3 (final node with CDC cache)
Each transformation adds a cdc_datetime_updated timestamp to track when rows were processed.
from flypipe import node
from pyspark.sql.functions import current_timestamp
@node(type="pyspark")
def t1():
"""Source node - passthrough for input data"""
return spark.createDataFrame(
data=[(1, "Alice"), (2, "Bob")],
schema=["id", "name"]
)
@node(
type="pyspark",
dependencies=[t1],
cache=IncrementalCDCCache(
table="customer_data_processed",
merge_keys=["id"],
schema="default",
cdc_table="cdc_metadata"
)
)
def t2(t1):
"""
Transformation node with CDC cache.
Adds processing logic and timestamp.
"""
return t1.selectExpr(
"id",
"name",
"CONCAT(name, '_processed') as processed_name"
).withColumn("cdc_datetime_updated", current_timestamp())
@node(
type="pyspark",
dependencies=[t2],
cache=IncrementalCDCCache(
table="customer_data_final",
merge_keys=["id"],
schema="default",
cdc_table="cdc_metadata"
)
)
def t3(t2):
"""
Final node with CDC cache.
Adds business logic and maintains timestamp.
"""
return t2.selectExpr(
"id",
"name",
"processed_name",
"id * 100 as customer_score",
"cdc_datetime_updated"
)
Setup - Clean Environment¶
Before running our example, let's clean up any existing tables.
# Clean up any existing tables
spark.sql("DROP TABLE IF EXISTS default.customer_data_processed")
spark.sql("DROP TABLE IF EXISTS default.customer_data_final")
spark.sql("DROP TABLE IF EXISTS default.cdc_metadata")
print("✓ Environment cleaned")
First Run - Initial Data Load¶
Process the initial batch of customer data (customers 1 and 2).
We use CacheMode.MERGE to enable incremental appending to the cache tables. The max_workers=2 parameter enables parallel execution, allowing independent nodes to run concurrently.
from flypipe.cache import CacheMode
# First batch of customer data
initial_input = spark.createDataFrame(
data=[(1, "Alice"), (2, "Bob")],
schema=["id", "name"]
)
print("=" * 60)
print("FIRST RUN - Processing initial data (customers 1, 2)")
print("=" * 60)
result1 = t3.run(
spark,
inputs={t1: initial_input},
cache={t2: CacheMode.MERGE, t3: CacheMode.MERGE},
max_workers=2 # Enable parallel execution with 2 workers
)
print("\n📊 Result of first run:")
result1.show()
print("\n📦 Cached data in final table:")
spark.table("default.customer_data_final").show()
Second Run - Incremental Load (CDC in Action!)¶
Now we'll process a new batch with only new customers (3 and 4).
The CDC cache will:
- Filter the data based on timestamps
- Process only the new rows
- Append results to the existing cache tables
# Second batch - only NEW customers
new_input = spark.createDataFrame(
data=[(3, "Charlie"), (4, "Diana")], # Only new rows
schema=["id", "name"]
)
print("=" * 60)
print("SECOND RUN - Processing new data (customers 3, 4)")
print("=" * 60)
result2 = t3.run(
spark,
inputs={t1: new_input},
cache={t2: CacheMode.MERGE, t3: CacheMode.MERGE},
max_workers=2 # Enable parallel execution with 2 workers
)
print("\n📊 Result of second run (only new rows processed):")
result2.show()
print("\n📦 Full cached data in final table (all historical data):")
spark.table("default.customer_data_final").orderBy("id").show()
Viewing CDC Metadata¶
The CDC manager tracks when each edge (source → destination) in the pipeline was last processed, in the context of a specific root/target node.
The CDC metadata table has the following columns:
- source: The upstream node name
- destination: The downstream/target node name
- cdc_datetime_updated: Timestamp when this edge was processed
print("📋 CDC Metadata Table:")
print("Tracks when each pipeline edge was last processed (source → destination)\n")
spark.table("default.cdc_metadata").orderBy("source", "destination", "cdc_datetime_updated").show(truncate=False)
How It Works¶
The Runner¶
Flypipe uses a Runner class to execute the transformation graph:
- Creates Execution Plan: Organizes nodes into levels based on dependencies
- Parallel Execution: Nodes at the same level can run in parallel (controlled by
max_workers) - CDC Table Creation: Calls
create_cdc_table()before executing each level to avoid concurrent creation conflicts - Memoization: Stores intermediate results in
run_context.node_resultsto avoid re-computation
First Run (Initial Load)¶
- Input: Customers 1 and 2
- Processing:
- Runner creates execution plan: Level 0: [t1], Level 1: [t2], Level 2: [t3]
- All nodes execute transformations
- With
max_workers=2, independent nodes could run in parallel
- Caching: Results written to Delta tables with timestamps (CREATE TABLE)
- CDC Metadata: Records when edges were processed (source → destination)
- Example:
t1 → t2,t2 → t3with timestamps - Each upstream node writes its own CDC metadata entry
- Example:
Second Run (Incremental Load)¶
- Input: Customers 3 and 4 (NEW data only)
- CDC Filtering:
read_cdc()checks timestamps from CDC metadata for this specific source → destination edge- Returns only rows with
cdc_datetime_updated> last processed time for that edge - In this case, all rows are new (no filtering needed)
- Processing:
- Runner re-creates execution plan for new data
- Only new customers are transformed
- Parallel execution when possible (based on
max_workers)
- Caching: Results merged into existing tables using Delta Lake's MERGE INTO
- Uses
merge_keys(e.g.,id) to match existing records - WHEN MATCHED: Updates existing records
- WHEN NOT MATCHED: Inserts new records
- Uses
- Final State: Cache contains all 4 customers (1, 2, 3, 4)
Benefits¶
✅ Performance: Only process new/changed data
✅ Storage: Maintain complete historical data in cache
✅ Scalability: Handle large datasets incrementally
✅ Auditability: Track when each edge was processed
✅ Upsert Logic: Automatically handles inserts and updates with MERGE INTO
✅ Parallel Execution: Independent nodes execute concurrently for better performance
✅ Thread-Safe: CDC table creation is handled before parallel execution
Advanced: Querying CDC Metadata¶
You can query the CDC metadata to understand your pipeline's processing history.
# Find the last processing time for each edge (source -> destination)
query = """
SELECT
source,
destination,
MAX(cdc_datetime_updated) as last_processed,
COUNT(*) as run_count
FROM default.cdc_metadata
GROUP BY source, destination
ORDER BY source, destination
"""
print("📊 CDC Processing Summary (grouped by root, source, destination):")
spark.sql(query).show(truncate=False)
Static Nodes - Skipping CDC Filtering¶
Sometimes you have nodes that contain reference data or lookup tables that don't change frequently, or you want to always load the complete cached dataset regardless of CDC timestamps. For these cases, you can mark a node as static using the .static() method.
What is a Static Node?¶
A static node is a cached node that skips CDC filtering when reading from cache. When a node is marked as static:
- ✅ The entire cached dataset is loaded (no timestamp filtering)
- ✅ CDC metadata is still tracked for the node
- ✅ The node can still use
CacheMode.MERGEto update its cache - ✅ Downstream nodes receive the complete dataset, not just incremental changes
When to Use Static Nodes¶
Use .static() for:
- Reference/lookup tables: Product catalogs, country codes, currency exchange rates
- Dimension tables: Small dimension tables that are always needed in full
- Configuration data: Application settings, business rules
- Small datasets: When the cost of loading all data is negligible
- Expensive computations: Complex transformations you want to compute once and reuse in full
Example: Product Catalog with Static Lookup¶
@node(type="pyspark", cache=ProductCache(...))
def product_catalog():
"""Load product reference data (updated weekly)"""
return spark.read.parquet("s3://data/products/")
@node(type="pyspark", cache=OrderCache(...))
def orders():
"""Load orders (updated daily)"""
return spark.read.parquet("s3://data/orders/")
@node(type="pyspark", dependencies=[orders, product_catalog.static()])
def enriched_orders(orders, product_catalog):
"""Join orders with COMPLETE product catalog (no CDC filtering)"""
return orders.join(product_catalog, "product_id", "left")
# First run: Process 100 orders, cache product catalog
enriched_orders.run(
spark,
cache={
orders: CacheMode.MERGE,
product_catalog: CacheMode.MERGE
},
max_workers=2
)
# Second run: Process 50 new orders
# - orders: Only new 50 orders (CDC filtered)
# - product_catalog: ALL products loaded (static, no CDC filtering)
enriched_orders.run(
spark,
cache={
orders: CacheMode.MERGE,
product_catalog: CacheMode.MERGE
},
max_workers=2
)
How It Works¶
Without
.static()(default CDC behavior):@node(dependencies=[product_catalog]) # CDC filtering applied def enriched_orders(product_catalog): # product_catalog contains only NEW/CHANGED products since last run pass
With
.static()(skip CDC filtering):@node(dependencies=[product_catalog.static()]) # NO CDC filtering def enriched_orders(product_catalog): # product_catalog contains ALL products (complete cache) pass
Technical Details¶
When implementing a cache with CDC support, you must accept and check the is_static parameter before applying CDC filtering.
Important: Only call cdc_manager.filter() if is_static is False:
def read(self, spark, from_node=None, to_node=None, is_static=False):
"""Read cached data with optional CDC filtering"""
df = spark.table(self.full_table_name)
# Apply CDC filtering ONLY if nodes are provided AND is_static is False
if from_node is not None and to_node is not None and not is_static:
df = self.cdc_manager.filter(spark, from_node, to_node, df)
elif is_static:
row_count = df.count()
print(f" → Static node {from_node.__name__}, loaded {row_count} rows (no CDC filtering)")
return df
Key points:
- Accept
is_staticparameter in yourread()method signature - Check
is_staticparameter before filtering - If
is_staticisTrue, skip CDC filtering entirely - The complete cached dataset is returned for static nodes
- Optional: Log when loading static nodes for debugging
Best Practices¶
✅ DO use .static() for tables needed in full
✅ DO combine static nodes with CDC-filtered fact tables
❌ DON'T use .static() when you need incremental processing
Production Use Cases¶
Example 1: Daily Batch Processing¶
# Day 1: Process initial load
initial_customers = spark.read.parquet("s3://data/customers/2024-01-01/")
pipeline.run(spark, inputs={source_node: initial_customers}, cache={...CacheMode.MERGE})
# Day 2: Process only new/changed customers
new_customers = spark.read.parquet("s3://data/customers/2024-01-02/")
pipeline.run(spark, inputs={source_node: new_customers}, cache={...CacheMode.MERGE})
# CDC automatically filters and processes only new rows
Example 2: Streaming Data Processing¶
# Micro-batch 1 (10:00 AM)
batch1 = spark.read.format("kafka")...
pipeline.run(spark, inputs={source: batch1}, cache={...CacheMode.MERGE})
# Micro-batch 2 (10:05 AM) - only new events
batch2 = spark.read.format("kafka")...
pipeline.run(spark, inputs={source: batch2}, cache={...CacheMode.MERGE})
Example 3: Data Warehouse Updates¶
# Weekly dimension table update
# CDC ensures only new/changed dimension records are processed
# and merged into the existing warehouse table
Summary¶
CDC Cache in Flypipe enables efficient incremental data processing by:
- Tracking timestamps - CDC metadata table records when each pipeline edge was processed (source → destination)
- Filtering data -
read()returns only rows updated after the last processing timestamp (unless node is static) - Static nodes - Use
.static()to skip CDC filtering and load complete cached datasets for reference data - Appending results -
CacheMode.MERGEuses Delta Lake's MERGE INTO for upsert operations - Maintaining history - Full dataset is preserved in cache while processing only deltas
- Parallel execution - Runner executes independent nodes concurrently (controlled by
max_workers)
Key Components¶
Runner: Executes the transformation graph with parallel execution support
- Creates execution plan with levels based on dependencies
- Calls
create_cdc_table()before each level to ensure thread-safe CDC table creation - Executes nodes in parallel within each level (when
max_workers> 1) - Memoizes results in
run_context.node_results
CDCManager: Manages CDC metadata in a Spark Delta table (created internally by the cache)
create_table(): Creates CDC metadata table (called by Runner before parallel execution)write(spark, upstream_node, to_node, timestamp): Records when edges were processedfilter(spark, from_node, to_node, df): Filters DataFrame based on CDC timestamps (skipped for static nodes)
IncrementalCDCCache: Extends
Cachewith CDC-specific methods- Accepts
spark,table,merge_keys,schema,catalog, andcdc_tableparameters create_cdc_table(spark): Ensures CDC metadata table exists (for thread-safe parallel execution)read(spark, from_node, to_node): Reads cached data with optional CDC filtering (skipped iffrom_node.static)write(spark, df, upstream_nodes, to_node, datetime_started_transformation): Writes data and CDC metadata- Uses Delta Lake's MERGE INTO for upsert operations based on
merge_keys - Creates
CDCManagerinternally in the constructor
- Accepts
Static Nodes: Use
.static()method on node dependencies to skip CDC filtering- Loads complete cached dataset instead of filtering by timestamp
- Ideal for reference tables, dimensions, and configuration data
cdc_datetime_updated: Timestamp column added to track row processing time
CacheMode.MERGE: Cache mode that triggers MERGE INTO for incremental updates
max_workers: Parameter to control parallel execution (defaults to
os.cpu_count() - 1)
This pattern is ideal for large-scale data pipelines where reprocessing all data would be inefficient or expensive.