Database CDC Implementation: PostgreSQL, MySQL, and More
This guide covers implementing Change Data Capture (CDC) for various database systems, focusing on practical implementation using tools like Debezium, Kafka Connect, and native database features.
Overview
Database CDC Methods
Different databases support CDC through various mechanisms:
| Database | Primary Method | Tool Support | Real-time |
|---|---|---|---|
| PostgreSQL | WAL (Write-Ahead Log) | Debezium, pg_chameleon | ✅ |
| MySQL | Binary Log (binlog) | Debezium, Maxwell | ✅ |
| Oracle | Redo Log | Debezium, Oracle GoldenGate | ✅ |
| SQL Server | Transaction Log | Debezium, Attunity | ✅ |
| MongoDB | Oplog | Debezium, Mongo Connector | ✅ |
PostgreSQL CDC Implementation
1. PostgreSQL Configuration
Enable WAL and Logical Replication
-- postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
max_worker_processes = 10
-- Restart PostgreSQL after changes
Create Replication User
-- Create replication user
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'replicator_password';
-- Grant necessary permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;
GRANT USAGE ON SCHEMA public TO replicator;
-- For future tables
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replicator;
Create Publication
-- Create publication for specific tables
CREATE PUBLICATION cdc_publication FOR TABLE users, orders, products;
-- Or create publication for all tables
CREATE PUBLICATION cdc_publication FOR ALL TABLES;
2. Debezium PostgreSQL Connector
Connector Configuration
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "replicator",
"database.password": "replicator_password",
"database.dbname": "mydb",
"database.server.name": "postgres-server",
"table.include.list": "public.users,public.orders,public.products",
"publication.name": "cdc_publication",
"slot.name": "debezium_slot",
"plugin.name": "pgoutput",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.postgres-server",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
Advanced Configuration
{
"name": "postgres-connector-advanced",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "replicator",
"database.password": "replicator_password",
"database.dbname": "mydb",
"database.server.name": "postgres-server",
// Table filtering
"table.include.list": "public.users,public.orders",
"table.exclude.list": "public.audit_logs",
"column.include.list": "public.users.id,public.users.name,public.users.email",
// Snapshot configuration
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.fetch.size": 1000,
// Performance tuning
"max.batch.size": 2048,
"max.queue.size": 8192,
"poll.interval.ms": 1000,
// Schema evolution
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.postgres-server",
// Data transformation
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
3. Custom PostgreSQL CDC Implementation
Using pg_logical
import psycopg2
import psycopg2.extensions
import json
import time
class PostgreSQLCDC:
def __init__(self, connection_params):
self.connection_params = connection_params
self.connection = None
self.replication_slot = "custom_cdc_slot"
self.publication = "custom_cdc_publication"
def connect(self):
"""Connect to PostgreSQL"""
self.connection = psycopg2.connect(**self.connection_params)
self.connection.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
)
def create_replication_slot(self):
"""Create replication slot"""
cursor = self.connection.cursor()
try:
cursor.execute(
f"SELECT pg_create_logical_replication_slot('{self.replication_slot}', 'pgoutput')"
)
print(f"Created replication slot: {self.replication_slot}")
except psycopg2.Error as e:
if "already exists" in str(e):
print(f"Replication slot {self.replication_slot} already exists")
else:
raise e
def start_replication(self):
"""Start logical replication"""
cursor = self.connection.cursor()
# Start replication
cursor.start_replication(
slot_name=self.replication_slot,
decode=True,
options={
'proto_version': '1',
'publication_names': self.publication
}
)
print("Started logical replication")
def consume_changes(self, callback):
"""Consume change events"""
cursor = self.connection.cursor()
def consume(msg):
try:
# Parse WAL message
change_event = self.parse_wal_message(msg)
if change_event:
callback(change_event)
except Exception as e:
print(f"Error processing message: {e}")
# Start consuming
cursor.consume_stream(consume)
def parse_wal_message(self, msg):
"""Parse WAL message into change event"""
try:
# Parse the WAL message
data = msg.payload.decode('utf-8')
# Extract change information
change_event = {
'timestamp': time.time(),
'lsn': msg.data_start,
'operation': self.extract_operation(data),
'table': self.extract_table(data),
'data': self.extract_data(data)
}
return change_event
except Exception as e:
print(f"Error parsing WAL message: {e}")
return None
def extract_operation(self, data):
"""Extract operation type from WAL data"""
if 'INSERT' in data:
return 'INSERT'
elif 'UPDATE' in data:
return 'UPDATE'
elif 'DELETE' in data:
return 'DELETE'
return 'UNKNOWN'
def extract_table(self, data):
"""Extract table name from WAL data"""
# Parse table name from WAL data
# Implementation depends on WAL format
pass
def extract_data(self, data):
"""Extract data from WAL message"""
# Parse actual data from WAL message
# Implementation depends on WAL format
pass
# Usage example
def handle_change_event(event):
"""Handle change event"""
print(f"Change detected: {event['operation']} on {event['table']}")
print(f"Data: {event['data']}")
# Initialize CDC
cdc = PostgreSQLCDC({
'host': 'localhost',
'port': 5432,
'database': 'mydb',
'user': 'replicator',
'password': 'replicator_password'
})
cdc.connect()
cdc.create_replication_slot()
cdc.start_replication()
cdc.consume_changes(handle_change_event)
MySQL CDC Implementation
1. MySQL Configuration
Enable Binary Logging
# my.cnf
[mysqld]
# Enable binary logging
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
# Enable GTID (Global Transaction Identifier)
gtid-mode=ON
enforce-gtid-consistency=ON
# Replication settings
server-id=1
max_binlog_size=100M
expire_logs_days=7
Create Replication User
-- Create replication user
CREATE USER 'replicator'@'%' IDENTIFIED BY 'replicator_password';
-- Grant replication privileges
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%';
-- Grant SELECT privileges for CDC
GRANT SELECT ON mydb.* TO 'replicator'@'%';
-- Flush privileges
FLUSH PRIVILEGES;
2. Debezium MySQL Connector
Basic Configuration
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "replicator",
"database.password": "replicator_password",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.orders,mydb.products",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql-server",
"include.schema.changes": "true",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
Advanced Configuration
{
"name": "mysql-connector-advanced",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "replicator",
"database.password": "replicator_password",
"database.server.id": "184054",
"database.server.name": "mysql-server",
// Database and table filtering
"database.include.list": "mydb,analytics_db",
"table.include.list": "mydb.users,mydb.orders",
"table.exclude.list": "mydb.temp_table",
// Column filtering
"column.include.list": "mydb.users.id,mydb.users.name,mydb.users.email",
// Snapshot configuration
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.fetch.size": 1000,
// Performance tuning
"max.batch.size": 2048,
"max.queue.size": 8192,
"poll.interval.ms": 1000,
"max.queue.size.in.bytes": 0,
// GTID support
"gtid.source.includes": "mysql-server",
// Schema evolution
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql-server",
"include.schema.changes": "true",
// Data transformation
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
3. Custom MySQL CDC Implementation
Using python-mysql-replication
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
import json
class MySQLCDC:
def __init__(self, mysql_settings):
self.mysql_settings = mysql_settings
self.stream = None
def start_streaming(self, callback):
"""Start streaming MySQL binlog events"""
# Create binlog stream reader
self.stream = BinLogStreamReader(
connection_settings=self.mysql_settings,
server_id=100,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
only_tables=['users', 'orders', 'products']
)
print("Started MySQL CDC streaming")
for binlogevent in self.stream:
try:
# Process the event
change_event = self.process_binlog_event(binlogevent)
if change_event:
callback(change_event)
except Exception as e:
print(f"Error processing binlog event: {e}")
def process_binlog_event(self, event):
"""Process binlog event into change event"""
change_event = {
'timestamp': event.timestamp,
'log_pos': event.packet.log_pos,
'table': event.table,
'schema': event.schema,
}
if isinstance(event, WriteRowsEvent):
change_event['operation'] = 'INSERT'
change_event['data'] = event.rows[0]['values']
elif isinstance(event, UpdateRowsEvent):
change_event['operation'] = 'UPDATE'
change_event['before'] = event.rows[0]['before_values']
change_event['after'] = event.rows[0]['after_values']
elif isinstance(event, DeleteRowsEvent):
change_event['operation'] = 'DELETE'
change_event['data'] = event.rows[0]['values']
return change_event
def stop_streaming(self):
"""Stop streaming"""
if self.stream:
self.stream.close()
# Usage example
def handle_mysql_change(event):
"""Handle MySQL change event"""
print(f"MySQL Change: {event['operation']} on {event['schema']}.{event['table']}")
if event['operation'] == 'UPDATE':
print(f"Before: {event['before']}")
print(f"After: {event['after']}")
else:
print(f"Data: {event['data']}")
# Initialize MySQL CDC
mysql_cdc = MySQLCDC({
'host': 'localhost',
'port': 3306,
'user': 'replicator',
'passwd': 'replicator_password',
'db': 'mydb'
})
mysql_cdc.start_streaming(handle_mysql_change)
Oracle CDC Implementation
1. Oracle Configuration
Enable LogMiner
-- Enable supplemental logging
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- Enable supplemental logging for specific tables
ALTER TABLE users ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Create LogMiner user
CREATE USER logminer_user IDENTIFIED BY logminer_password;
GRANT CREATE SESSION TO logminer_user;
GRANT SELECT ON V_$DATABASE TO logminer_user;
GRANT FLASHBACK ANY TABLE TO logminer_user;
GRANT SELECT ANY TABLE TO logminer_user;
GRANT LOCK ANY TABLE TO logminer_user;
GRANT CREATE TABLE TO logminer_user;
2. Debezium Oracle Connector
Configuration
{
"name": "oracle-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"database.hostname": "localhost",
"database.port": "1521",
"database.user": "logminer_user",
"database.password": "logminer_password",
"database.dbname": "XE",
"database.server.name": "oracle-server",
"table.include.list": "HR.USERS,HR.ORDERS",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.oracle-server",
"log.mining.strategy": "online_catalog",
"log.mining.continuous.mine": "true",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
SQL Server CDC Implementation
1. SQL Server Configuration
Enable CDC
-- Enable CDC on database
USE MyDatabase;
EXEC sys.sp_cdc_enable_db;
-- Enable CDC on specific tables
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'users',
@role_name = 'cdc_admin';
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'orders',
@role_name = 'cdc_admin';
Create CDC User
-- Create CDC user
CREATE LOGIN cdc_user WITH PASSWORD = 'cdc_password';
USE MyDatabase;
CREATE USER cdc_user FOR LOGIN cdc_user;
-- Grant CDC permissions
EXEC sp_addrolemember 'db_datareader', 'cdc_user';
EXEC sp_addrolemember 'cdc_admin', 'cdc_user';
2. Debezium SQL Server Connector
Configuration
{
"name": "sqlserver-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "localhost",
"database.port": "1433",
"database.user": "cdc_user",
"database.password": "cdc_password",
"database.dbname": "MyDatabase",
"database.server.name": "sqlserver-server",
"table.include.list": "dbo.users,dbo.orders",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.sqlserver-server",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
MongoDB CDC Implementation
1. MongoDB Configuration
Enable Oplog
// MongoDB replica set configuration
rs.initiate({
_id: "rs0",
members: [{ _id: 0, host: "localhost:27017" }],
});
// Check oplog status
db.oplog.rs.find().limit(1);
2. Debezium MongoDB Connector
Configuration
{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "rs0/localhost:27017",
"mongodb.name": "mongodb-server",
"mongodb.user": "debezium",
"mongodb.password": "debezium_password",
"collection.include.list": "mydb.users,mydb.orders",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mongodb-server",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
CDC Monitoring and Management
1. Connector Health Monitoring
import requests
import json
import time
class CDCConnectorMonitor:
def __init__(self, kafka_connect_url):
self.kafka_connect_url = kafka_connect_url
def get_connector_status(self, connector_name):
"""Get connector status"""
response = requests.get(
f"{self.kafka_connect_url}/connectors/{connector_name}/status"
)
return response.json()
def get_connector_tasks(self, connector_name):
"""Get connector tasks"""
response = requests.get(
f"{self.kafka_connect_url}/connectors/{connector_name}/tasks"
)
return response.json()
def restart_connector(self, connector_name):
"""Restart connector"""
response = requests.post(
f"{self.kafka_connect_url}/connectors/{connector_name}/restart"
)
return response.status_code == 204
def pause_connector(self, connector_name):
"""Pause connector"""
response = requests.put(
f"{self.kafka_connect_url}/connectors/{connector_name}/pause"
)
return response.status_code == 202
def resume_connector(self, connector_name):
"""Resume connector"""
response = requests.put(
f"{self.kafka_connect_url}/connectors/{connector_name}/resume"
)
return response.status_code == 202
def monitor_connector_health(self, connector_name):
"""Monitor connector health"""
while True:
try:
status = self.get_connector_status(connector_name)
connector_state = status['connector']['state']
print(f"Connector {connector_name} state: {connector_state}")
# Check tasks
for task in status['tasks']:
task_id = task['id']
task_state = task['state']
print(f"Task {task_id} state: {task_state}")
if task_state == 'FAILED':
print(f"Task {task_id} failed: {task.get('trace', 'No trace available')}")
time.sleep(30) # Check every 30 seconds
except Exception as e:
print(f"Error monitoring connector: {e}")
time.sleep(60)
# Usage
monitor = CDCConnectorMonitor("http://localhost:8083")
monitor.monitor_connector_health("postgres-connector")
2. Lag Monitoring
from kafka import KafkaConsumer
import json
class CDCLagMonitor:
def __init__(self, kafka_bootstrap_servers):
self.kafka_bootstrap_servers = kafka_bootstrap_servers
def get_consumer_lag(self, topic, consumer_group):
"""Get consumer lag for topic"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.kafka_bootstrap_servers,
group_id=consumer_group,
enable_auto_commit=False
)
# Get partition information
partitions = consumer.partitions_for_topic(topic)
lag_info = {}
for partition in partitions:
# Get high water mark
high_water = consumer.highwater(partition)
# Get committed offset
committed = consumer.committed(partition)
# Calculate lag
lag = high_water - committed if committed else high_water
lag_info[partition] = {
'high_water': high_water,
'committed': committed,
'lag': lag
}
return lag_info
def monitor_lag(self, topics, consumer_groups):
"""Monitor lag for multiple topics and consumer groups"""
while True:
for topic in topics:
for consumer_group in consumer_groups:
try:
lag_info = self.get_consumer_lag(topic, consumer_group)
total_lag = sum(info['lag'] for info in lag_info.values())
print(f"Topic: {topic}, Consumer Group: {consumer_group}")
print(f"Total Lag: {total_lag}")
for partition, info in lag_info.items():
print(f" Partition {partition}: {info['lag']} messages behind")
# Alert if lag is too high
if total_lag > 10000:
print(f"WARNING: High lag detected for {topic} in {consumer_group}")
except Exception as e:
print(f"Error monitoring lag for {topic}/{consumer_group}: {e}")
time.sleep(60) # Check every minute
# Usage
lag_monitor = CDCLagMonitor(['localhost:9092'])
lag_monitor.monitor_lag(
topics=['users', 'orders', 'products'],
consumer_groups=['analytics-consumer', 'cache-consumer']
)
Best Practices
1. Performance Optimization
# CDC performance best practices
# 1. Batch processing
def batch_process_events(events, batch_size=1000):
"""Process CDC events in batches"""
for i in range(0, len(events), batch_size):
batch = events[i:i + batch_size]
# Process batch
process_batch(batch)
# Commit offsets
commit_offsets(batch[-1].offset)
# 2. Parallel processing
import concurrent.futures
def parallel_process_events(events, max_workers=10):
"""Process events in parallel"""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for event in events:
future = executor.submit(process_event, event)
futures.append(future)
# Wait for all to complete
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
except Exception as e:
print(f"Error processing event: {e}")
# 3. Connection pooling
import psycopg2.pool
class DatabaseConnectionPool:
def __init__(self, connection_params, min_conn=5, max_conn=20):
self.pool = psycopg2.pool.ThreadedConnectionPool(
min_conn, max_conn, **connection_params
)
def get_connection(self):
return self.pool.getconn()
def return_connection(self, conn):
self.pool.putconn(conn)
2. Error Handling and Recovery
# Robust error handling for CDC
class CDCEventProcessor:
def __init__(self):
self.retry_count = 3
self.retry_delay = 1
def process_event_with_retry(self, event):
"""Process event with retry mechanism"""
for attempt in range(self.retry_count):
try:
return self.process_event(event)
except TemporaryError as e:
if attempt < self.retry_count - 1:
time.sleep(self.retry_delay * (2 ** attempt))
continue
else:
# Send to dead letter queue
self.send_to_dlq(event, e)
raise
except PermanentError as e:
# Don't retry permanent errors
self.send_to_dlq(event, e)
raise
def send_to_dlq(self, event, error):
"""Send failed event to dead letter queue"""
dlq_message = {
'original_event': event,
'error': str(error),
'timestamp': time.time(),
'retry_count': self.retry_count
}
# Send to DLQ topic
producer.send('cdc-dlq', dlq_message)
3. Data Validation
# CDC data validation
class CDCDataValidator:
def __init__(self):
self.schema_registry = SchemaRegistryClient({
'url': 'http://localhost:8081'
})
def validate_event(self, event):
"""Validate CDC event"""
# Schema validation
if not self.validate_schema(event):
raise ValidationError("Schema validation failed")
# Data validation
if not self.validate_data(event):
raise ValidationError("Data validation failed")
# Business rule validation
if not self.validate_business_rules(event):
raise ValidationError("Business rule validation failed")
return True
def validate_schema(self, event):
"""Validate event schema"""
# Implement schema validation
return True
def validate_data(self, event):
"""Validate event data"""
# Implement data validation
return True
def validate_business_rules(self, event):
"""Validate business rules"""
# Implement business rule validation
return True
Conclusion
Database CDC implementation provides real-time data synchronization across different database systems. Key considerations include:
Implementation Factors
- Database Type: Choose appropriate CDC method for your database
- Performance Impact: Minimize impact on source database
- Data Consistency: Ensure reliable change capture
- Monitoring: Implement comprehensive monitoring and alerting
Best Practices
- Use proven tools like Debezium for production environments
- Implement proper error handling and retry mechanisms
- Monitor connector health and consumer lag
- Validate data integrity and schema evolution
- Optimize for performance with batching and parallel processing
Database CDC enables real-time data integration and is essential for modern data architectures requiring immediate data synchronization across systems.
Resources
Official Resources
Database-specific Resources
Related Documentation
- Change Data Capture Overview
- Kafka CDC Integration
- CDC Tools Comparison (coming soon)