Skip to main content

Change Data Capture (CDC): Real-time Data Synchronization

Change Data Capture (CDC) is a design pattern that identifies and captures changes made to data in a database, then delivers those changes in real-time to downstream systems. CDC enables real-time data integration, synchronization, and streaming analytics across heterogeneous systems.

Overview

What is Change Data Capture?

Change Data Capture is a technique that:

  • Captures Changes: Identifies INSERT, UPDATE, DELETE operations in source databases
  • Real-time Processing: Delivers changes as they occur, not in batch
  • Event Streaming: Converts database changes into event streams
  • Downstream Integration: Feeds changes to analytics systems, data warehouses, and applications
  • Data Consistency: Maintains data consistency across distributed systems

Key Benefits

  • Real-time Data: Immediate availability of data changes
  • Reduced Latency: Eliminates batch processing delays
  • System Decoupling: Loose coupling between source and target systems
  • Scalability: Handles high-volume change streams efficiently
  • Data Lineage: Complete audit trail of data changes

CDC Architecture Patterns

┌─────────────────────────────────────────────────────────┐
│ Source Database │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Table A │ │ Table B │ │ Table C │ │
│ │ │ │ │ │ │ │
│ │ INSERT │ │ UPDATE │ │ DELETE │ │
│ │ UPDATE │ │ INSERT │ │ INSERT │ │
│ │ DELETE │ │ DELETE │ │ UPDATE │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────┬───────────────────────────────────┘
│ Change Events
┌─────────────────────▼───────────────────────────────────┐
│ CDC Capture Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Log-based │ │ Trigger- │ │ Query- │ │
│ │ CDC │ │ based CDC │ │ based CDC │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────┬───────────────────────────────────┘
│ Change Stream
┌─────────────────────▼───────────────────────────────────┐
│ Message Broker (Kafka) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Topic A │ │ Topic B │ │ Topic C │ │
│ │ Changes │ │ Changes │ │ Changes │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────┬───────────────────────────────────┘
│ Event Streams
┌─────────────────────▼───────────────────────────────────┐
│ Downstream Systems │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Analytics │ │ Cache │ │ Search │ │
│ │ Platform │ │ Layer │ │ Engine │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘

CDC Implementation Methods

1. Log-based CDC

Log-based CDC reads database transaction logs to capture changes:

How it Works

Database Transaction Log → CDC Connector → Change Events → Kafka

Advantages

  • Minimal Performance Impact: No impact on application performance
  • Complete Change History: Captures all changes including rollbacks
  • Real-time: Near real-time change capture
  • Reliable: Uses database's own transaction log

Disadvantages

  • Database-specific: Each database has different log formats
  • Complex Setup: Requires understanding of database internals
  • Log Retention: Depends on database log retention policies

Supported Databases

  • PostgreSQL: WAL (Write-Ahead Log)
  • MySQL: Binary Log (binlog)
  • Oracle: Redo Log
  • SQL Server: Transaction Log
  • MongoDB: Oplog

2. Trigger-based CDC

Trigger-based CDC uses database triggers to capture changes:

How it Works

Table Change → Database Trigger → CDC Table → Change Events → Kafka

Advantages

  • Database Agnostic: Works with any database supporting triggers
  • Flexible: Can capture custom change information
  • Immediate: Changes captured immediately
  • Selective: Can filter specific tables or columns

Disadvantages

  • Performance Impact: Adds overhead to every transaction
  • Complexity: Requires trigger management
  • Reliability: Depends on trigger execution
  • Maintenance: Triggers need to be maintained

3. Query-based CDC

Query-based CDC periodically queries for changes:

How it Works

Scheduled Query → Compare Timestamps → Change Events → Kafka

Advantages

  • Simple Implementation: Easy to understand and implement
  • Database Agnostic: Works with any SQL database
  • Flexible: Can implement custom change detection logic
  • Low Risk: No impact on source database

Disadvantages

  • Not Real-time: Batch-based processing
  • Performance Impact: Periodic queries can impact database
  • Missed Changes: May miss changes between queries
  • Timestamp Dependency: Relies on reliable timestamp columns

CDC Data Formats

1. Change Event Structure

{
"schema": {
"type": "struct",
"fields": [
{
"field": "before",
"type": "struct",
"fields": [
{ "field": "id", "type": "int32" },
{ "field": "name", "type": "string" },
{ "field": "email", "type": "string" }
]
},
{
"field": "after",
"type": "struct",
"fields": [
{ "field": "id", "type": "int32" },
{ "field": "name", "type": "string" },
{ "field": "email", "type": "string" }
]
},
{ "field": "source", "type": "struct" },
{ "field": "op", "type": "string" },
{ "field": "ts_ms", "type": "int64" }
]
},
"payload": {
"before": {
"id": 1,
"name": "John Doe",
"email": "john@example.com"
},
"after": {
"id": 1,
"name": "John Smith",
"email": "john.smith@example.com"
},
"source": {
"version": "1.9.7.Final",
"connector": "postgresql",
"name": "postgres-connector",
"ts_ms": 1640995200000,
"snapshot": "false",
"db": "mydb",
"sequence": "[\"1234567890\",\"1234567891\"]",
"schema": "public",
"table": "users",
"txId": 12345,
"lsid": null,
"xmin": null
},
"op": "u",
"ts_ms": 1640995200000
}
}

2. Operation Types

OperationCodeDescription
CreatecINSERT operation
ReadrInitial snapshot read
UpdateuUPDATE operation
DeletedDELETE operation
TruncatetTRUNCATE operation

3. Schema Evolution

{
"schema": {
"type": "struct",
"fields": [
{
"field": "before",
"type": "struct",
"fields": [
{ "field": "id", "type": "int32" },
{ "field": "name", "type": "string" },
{ "field": "email", "type": "string" },
{ "field": "phone", "type": "string", "default": null }
]
}
]
}
}

CDC Use Cases

1. Real-time Analytics

# Real-time analytics with CDC
from kafka import KafkaConsumer
import json

def process_user_changes():
"""Process user changes for real-time analytics"""

consumer = KafkaConsumer(
'user-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
change_event = message.value
operation = change_event['payload']['op']

if operation == 'c': # Create
handle_user_created(change_event['payload']['after'])
elif operation == 'u': # Update
handle_user_updated(
change_event['payload']['before'],
change_event['payload']['after']
)
elif operation == 'd': # Delete
handle_user_deleted(change_event['payload']['before'])

def handle_user_created(user_data):
"""Handle new user creation"""
# Update user count metrics
# Send welcome email
# Initialize user profile
pass

def handle_user_updated(before, after):
"""Handle user updates"""
# Track profile changes
# Update search index
# Notify relevant systems
pass

2. Data Warehouse Synchronization

# Data warehouse sync with CDC
def sync_to_data_warehouse():
"""Sync changes to data warehouse"""

consumer = KafkaConsumer(
'order-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
change_event = message.value
operation = change_event['payload']['op']

if operation in ['c', 'u']: # Create or Update
# Upsert to data warehouse
upsert_to_warehouse(change_event['payload']['after'])
elif operation == 'd': # Delete
# Soft delete in data warehouse
soft_delete_in_warehouse(change_event['payload']['before'])

def upsert_to_warehouse(data):
"""Upsert data to warehouse"""
# Implement warehouse upsert logic
pass

3. Cache Invalidation

# Cache invalidation with CDC
def invalidate_cache():
"""Invalidate cache based on data changes"""

consumer = KafkaConsumer(
'product-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
change_event = message.value
operation = change_event['payload']['op']
product_id = change_event['payload']['after']['id']

# Invalidate related cache entries
cache_keys = [
f"product:{product_id}",
f"product_list:{product_id}",
f"recommendations:{product_id}"
]

for key in cache_keys:
redis_client.delete(key)

# Warm cache with new data if needed
if operation in ['c', 'u']:
warm_cache(product_id)

4. Search Index Updates

# Search index updates with CDC
def update_search_index():
"""Update search index with data changes"""

consumer = KafkaConsumer(
'document-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
change_event = message.value
operation = change_event['payload']['op']

if operation == 'c': # Create
# Add to search index
add_to_search_index(change_event['payload']['after'])
elif operation == 'u': # Update
# Update search index
update_search_index(change_event['payload']['after'])
elif operation == 'd': # Delete
# Remove from search index
remove_from_search_index(change_event['payload']['before'])

CDC Challenges and Solutions

1. Data Consistency

Challenge

Ensuring data consistency across distributed systems when processing CDC events.

Solutions

# Idempotent processing
def process_change_event(event_id, change_event):
"""Process change event with idempotency"""

# Check if already processed
if is_event_processed(event_id):
return

try:
# Process the event
process_event(change_event)

# Mark as processed
mark_event_processed(event_id)

except Exception as e:
# Handle processing errors
log_error(event_id, e)
raise

# Event ordering
def process_ordered_events():
"""Process events in order using partition keys"""

consumer = KafkaConsumer(
'user-changes',
bootstrap_servers=['localhost:9092'],
key_deserializer=lambda x: x.decode('utf-8'),
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Process messages in order per partition
for message in consumer:
user_id = message.key
change_event = message.value

# Process events for this user in order
process_user_event(user_id, change_event)

2. Schema Evolution

Challenge

Handling schema changes in source databases without breaking downstream systems.

Solutions

# Schema registry integration
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import MessageField, SerializationContext

def handle_schema_evolution():
"""Handle schema evolution gracefully"""

schema_registry = SchemaRegistryClient({
'url': 'http://localhost:8081'
})

# Get latest schema
latest_schema = schema_registry.get_latest_version('user-changes-value')

# Deserialize with schema evolution
deserializer = AvroDeserializer(
schema_registry,
latest_schema.schema.schema_str
)

consumer = KafkaConsumer(
'user-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=deserializer
)

for message in consumer:
# Handle different schema versions
handle_change_event(message.value)

def handle_change_event(data):
"""Handle change event with backward compatibility"""

# Check for new fields
if 'phone' in data:
# Handle new phone field
process_phone_field(data['phone'])

# Handle removed fields gracefully
if 'old_field' not in data:
# Use default value
data['old_field'] = get_default_value()

3. Performance Optimization

Challenge

Processing high-volume CDC streams efficiently.

Solutions

# Batch processing
def batch_process_events():
"""Process events in batches for better performance"""

consumer = KafkaConsumer(
'user-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
max_poll_records=1000 # Process up to 1000 records at once
)

batch = []
batch_size = 100

for message in consumer:
batch.append(message.value)

if len(batch) >= batch_size:
# Process batch
process_batch(batch)
batch = []

# Process remaining events
if batch:
process_batch(batch)

# Parallel processing
import concurrent.futures

def parallel_process_events():
"""Process events in parallel"""

consumer = KafkaConsumer(
'user-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
for message in consumer:
# Submit for parallel processing
executor.submit(process_event, message.value)

4. Error Handling and Recovery

Challenge

Handling failures and ensuring reliable CDC processing.

Solutions

# Dead letter queue
def process_with_dlq():
"""Process events with dead letter queue for failed events"""

consumer = KafkaConsumer(
'user-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=False
)

producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

for message in consumer:
try:
# Process the event
process_event(message.value)

# Commit offset on success
consumer.commit()

except Exception as e:
# Send to dead letter queue
dlq_message = {
'original_message': message.value,
'error': str(e),
'timestamp': time.time()
}

producer.send('user-changes-dlq', dlq_message)

# Commit offset to avoid reprocessing
consumer.commit()

# Retry mechanism
import time
from functools import wraps

def retry(max_attempts=3, delay=1):
"""Retry decorator for failed operations"""

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise e
time.sleep(delay * (2 ** attempt)) # Exponential backoff
return None
return wrapper
return decorator

@retry(max_attempts=3, delay=1)
def process_event_with_retry(event):
"""Process event with retry mechanism"""
# Event processing logic
pass

Monitoring and Observability

1. Metrics Collection

# CDC metrics
from prometheus_client import Counter, Histogram, Gauge

# Metrics
cdc_events_processed = Counter('cdc_events_processed_total', 'Total CDC events processed', ['operation', 'table'])
cdc_processing_duration = Histogram('cdc_processing_duration_seconds', 'CDC processing duration')
cdc_lag = Gauge('cdc_consumer_lag', 'CDC consumer lag', ['topic', 'partition'])

def process_with_metrics():
"""Process CDC events with metrics"""

consumer = KafkaConsumer(
'user-changes',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
start_time = time.time()

try:
# Process the event
process_event(message.value)

# Record metrics
operation = message.value['payload']['op']
table = message.value['payload']['source']['table']
cdc_events_processed.labels(operation=operation, table=table).inc()

finally:
# Record processing duration
duration = time.time() - start_time
cdc_processing_duration.observe(duration)

2. Health Checks

# CDC health checks
def check_cdc_health():
"""Check CDC system health"""

health_status = {
'status': 'healthy',
'checks': {}
}

# Check Kafka connectivity
try:
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
consumer_timeout_ms=1000
)
health_status['checks']['kafka'] = 'healthy'
except Exception as e:
health_status['checks']['kafka'] = f'unhealthy: {e}'
health_status['status'] = 'unhealthy'

# Check database connectivity
try:
# Check database connection
health_status['checks']['database'] = 'healthy'
except Exception as e:
health_status['checks']['database'] = f'unhealthy: {e}'
health_status['status'] = 'unhealthy'

# Check consumer lag
try:
lag = get_consumer_lag()
if lag > 10000: # High lag threshold
health_status['checks']['consumer_lag'] = f'high: {lag}'
health_status['status'] = 'degraded'
else:
health_status['checks']['consumer_lag'] = f'normal: {lag}'
except Exception as e:
health_status['checks']['consumer_lag'] = f'error: {e}'

return health_status

Best Practices

1. Design Principles

# CDC best practices implementation

# 1. Use idempotent operations
def idempotent_upsert(table, data, key_fields):
"""Idempotent upsert operation"""

# Check if record exists
existing = get_record(table, key_fields)

if existing:
# Update existing record
update_record(table, data, key_fields)
else:
# Insert new record
insert_record(table, data)

# 2. Handle schema evolution
def handle_schema_evolution(data, schema_version):
"""Handle schema evolution gracefully"""

if schema_version == '1.0':
# Handle v1.0 schema
return process_v1_schema(data)
elif schema_version == '2.0':
# Handle v2.0 schema with backward compatibility
return process_v2_schema(data)
else:
raise ValueError(f"Unsupported schema version: {schema_version}")

# 3. Implement proper error handling
def robust_event_processing(event):
"""Robust event processing with error handling"""

try:
# Validate event
validate_event(event)

# Process event
result = process_event(event)

# Log success
logger.info(f"Successfully processed event: {event['id']}")

return result

except ValidationError as e:
# Log validation error and skip
logger.warning(f"Validation error for event {event['id']}: {e}")
return None

except ProcessingError as e:
# Log processing error and retry
logger.error(f"Processing error for event {event['id']}: {e}")
raise

except Exception as e:
# Log unexpected error
logger.error(f"Unexpected error for event {event['id']}: {e}")
raise

2. Performance Optimization

# Performance optimization techniques

# 1. Batch processing
def batch_process_events(events, batch_size=100):
"""Process events in batches"""

for i in range(0, len(events), batch_size):
batch = events[i:i + batch_size]

# Process batch
process_batch(batch)

# Commit progress
commit_batch_progress(i + len(batch))

# 2. Parallel processing
import asyncio

async def async_process_events(events):
"""Process events asynchronously"""

tasks = []
for event in events:
task = asyncio.create_task(process_event_async(event))
tasks.append(task)

# Wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)

# Handle results
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Error processing event {i}: {result}")

# 3. Caching frequently accessed data
from functools import lru_cache

@lru_cache(maxsize=1000)
def get_user_info(user_id):
"""Cache user information"""
return fetch_user_from_database(user_id)

3. Security Considerations

# Security best practices

# 1. Data encryption
def encrypt_sensitive_data(data):
"""Encrypt sensitive data in CDC events"""

sensitive_fields = ['ssn', 'credit_card', 'password']

for field in sensitive_fields:
if field in data:
data[field] = encrypt_field(data[field])

return data

# 2. Access control
def check_access_permissions(user_id, table_name):
"""Check if user has access to table changes"""

permissions = get_user_permissions(user_id)

if table_name not in permissions['allowed_tables']:
raise PermissionError(f"Access denied to table: {table_name}")

return True

# 3. Audit logging
def audit_cdc_event(event, user_id):
"""Audit CDC event processing"""

audit_log = {
'timestamp': time.time(),
'user_id': user_id,
'event_id': event['id'],
'operation': event['operation'],
'table': event['table'],
'action': 'processed'
}

write_audit_log(audit_log)

Conclusion

Change Data Capture (CDC) is a powerful pattern for real-time data integration that enables:

Key Benefits

  • Real-time Data Flow: Immediate availability of data changes
  • System Decoupling: Loose coupling between source and target systems
  • Scalable Architecture: Handles high-volume change streams
  • Data Consistency: Maintains consistency across distributed systems

Best Use Cases

  • Real-time Analytics: Immediate insights from data changes
  • Data Synchronization: Keep multiple systems in sync
  • Event-driven Architecture: Build reactive systems
  • Data Warehousing: Real-time ETL processes

When to Choose CDC

  • Need real-time data processing
  • Multiple systems require data synchronization
  • Building event-driven architectures
  • Implementing real-time analytics

CDC transforms traditional batch-oriented data processing into real-time, event-driven architectures that can respond immediately to data changes.

Resources

Official Resources

Learning Resources

  • Kafka Integration Guide
  • Database Replication (coming soon)
  • Stream Processing (coming soon)