Skip to main content

Database CDC Implementation: PostgreSQL, MySQL, and More

This guide covers implementing Change Data Capture (CDC) for various database systems, focusing on practical implementation using tools like Debezium, Kafka Connect, and native database features.

Overview

Database CDC Methods

Different databases support CDC through various mechanisms:

DatabasePrimary MethodTool SupportReal-time
PostgreSQLWAL (Write-Ahead Log)Debezium, pg_chameleon
MySQLBinary Log (binlog)Debezium, Maxwell
OracleRedo LogDebezium, Oracle GoldenGate
SQL ServerTransaction LogDebezium, Attunity
MongoDBOplogDebezium, Mongo Connector

PostgreSQL CDC Implementation

1. PostgreSQL Configuration

Enable WAL and Logical Replication

-- postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
max_worker_processes = 10

-- Restart PostgreSQL after changes

Create Replication User

-- Create replication user
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'replicator_password';

-- Grant necessary permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;
GRANT USAGE ON SCHEMA public TO replicator;

-- For future tables
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replicator;

Create Publication

-- Create publication for specific tables
CREATE PUBLICATION cdc_publication FOR TABLE users, orders, products;

-- Or create publication for all tables
CREATE PUBLICATION cdc_publication FOR ALL TABLES;

2. Debezium PostgreSQL Connector

Connector Configuration

{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "replicator",
"database.password": "replicator_password",
"database.dbname": "mydb",
"database.server.name": "postgres-server",
"table.include.list": "public.users,public.orders,public.products",
"publication.name": "cdc_publication",
"slot.name": "debezium_slot",
"plugin.name": "pgoutput",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.postgres-server",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

Advanced Configuration

{
"name": "postgres-connector-advanced",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "replicator",
"database.password": "replicator_password",
"database.dbname": "mydb",
"database.server.name": "postgres-server",

// Table filtering
"table.include.list": "public.users,public.orders",
"table.exclude.list": "public.audit_logs",
"column.include.list": "public.users.id,public.users.name,public.users.email",

// Snapshot configuration
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.fetch.size": 1000,

// Performance tuning
"max.batch.size": 2048,
"max.queue.size": 8192,
"poll.interval.ms": 1000,

// Schema evolution
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.postgres-server",

// Data transformation
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

3. Custom PostgreSQL CDC Implementation

Using pg_logical

import psycopg2
import psycopg2.extensions
import json
import time

class PostgreSQLCDC:
def __init__(self, connection_params):
self.connection_params = connection_params
self.connection = None
self.replication_slot = "custom_cdc_slot"
self.publication = "custom_cdc_publication"

def connect(self):
"""Connect to PostgreSQL"""
self.connection = psycopg2.connect(**self.connection_params)
self.connection.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
)

def create_replication_slot(self):
"""Create replication slot"""
cursor = self.connection.cursor()

try:
cursor.execute(
f"SELECT pg_create_logical_replication_slot('{self.replication_slot}', 'pgoutput')"
)
print(f"Created replication slot: {self.replication_slot}")
except psycopg2.Error as e:
if "already exists" in str(e):
print(f"Replication slot {self.replication_slot} already exists")
else:
raise e

def start_replication(self):
"""Start logical replication"""
cursor = self.connection.cursor()

# Start replication
cursor.start_replication(
slot_name=self.replication_slot,
decode=True,
options={
'proto_version': '1',
'publication_names': self.publication
}
)

print("Started logical replication")

def consume_changes(self, callback):
"""Consume change events"""
cursor = self.connection.cursor()

def consume(msg):
try:
# Parse WAL message
change_event = self.parse_wal_message(msg)
if change_event:
callback(change_event)
except Exception as e:
print(f"Error processing message: {e}")

# Start consuming
cursor.consume_stream(consume)

def parse_wal_message(self, msg):
"""Parse WAL message into change event"""
try:
# Parse the WAL message
data = msg.payload.decode('utf-8')

# Extract change information
change_event = {
'timestamp': time.time(),
'lsn': msg.data_start,
'operation': self.extract_operation(data),
'table': self.extract_table(data),
'data': self.extract_data(data)
}

return change_event

except Exception as e:
print(f"Error parsing WAL message: {e}")
return None

def extract_operation(self, data):
"""Extract operation type from WAL data"""
if 'INSERT' in data:
return 'INSERT'
elif 'UPDATE' in data:
return 'UPDATE'
elif 'DELETE' in data:
return 'DELETE'
return 'UNKNOWN'

def extract_table(self, data):
"""Extract table name from WAL data"""
# Parse table name from WAL data
# Implementation depends on WAL format
pass

def extract_data(self, data):
"""Extract data from WAL message"""
# Parse actual data from WAL message
# Implementation depends on WAL format
pass

# Usage example
def handle_change_event(event):
"""Handle change event"""
print(f"Change detected: {event['operation']} on {event['table']}")
print(f"Data: {event['data']}")

# Initialize CDC
cdc = PostgreSQLCDC({
'host': 'localhost',
'port': 5432,
'database': 'mydb',
'user': 'replicator',
'password': 'replicator_password'
})

cdc.connect()
cdc.create_replication_slot()
cdc.start_replication()
cdc.consume_changes(handle_change_event)

MySQL CDC Implementation

1. MySQL Configuration

Enable Binary Logging

# my.cnf
[mysqld]
# Enable binary logging
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL

# Enable GTID (Global Transaction Identifier)
gtid-mode=ON
enforce-gtid-consistency=ON

# Replication settings
server-id=1
max_binlog_size=100M
expire_logs_days=7

Create Replication User

-- Create replication user
CREATE USER 'replicator'@'%' IDENTIFIED BY 'replicator_password';

-- Grant replication privileges
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%';

-- Grant SELECT privileges for CDC
GRANT SELECT ON mydb.* TO 'replicator'@'%';

-- Flush privileges
FLUSH PRIVILEGES;

2. Debezium MySQL Connector

Basic Configuration

{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "replicator",
"database.password": "replicator_password",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.orders,mydb.products",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql-server",
"include.schema.changes": "true",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

Advanced Configuration

{
"name": "mysql-connector-advanced",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "replicator",
"database.password": "replicator_password",
"database.server.id": "184054",
"database.server.name": "mysql-server",

// Database and table filtering
"database.include.list": "mydb,analytics_db",
"table.include.list": "mydb.users,mydb.orders",
"table.exclude.list": "mydb.temp_table",

// Column filtering
"column.include.list": "mydb.users.id,mydb.users.name,mydb.users.email",

// Snapshot configuration
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.fetch.size": 1000,

// Performance tuning
"max.batch.size": 2048,
"max.queue.size": 8192,
"poll.interval.ms": 1000,
"max.queue.size.in.bytes": 0,

// GTID support
"gtid.source.includes": "mysql-server",

// Schema evolution
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql-server",
"include.schema.changes": "true",

// Data transformation
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

3. Custom MySQL CDC Implementation

Using python-mysql-replication

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
import json

class MySQLCDC:
def __init__(self, mysql_settings):
self.mysql_settings = mysql_settings
self.stream = None

def start_streaming(self, callback):
"""Start streaming MySQL binlog events"""

# Create binlog stream reader
self.stream = BinLogStreamReader(
connection_settings=self.mysql_settings,
server_id=100,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
only_tables=['users', 'orders', 'products']
)

print("Started MySQL CDC streaming")

for binlogevent in self.stream:
try:
# Process the event
change_event = self.process_binlog_event(binlogevent)
if change_event:
callback(change_event)
except Exception as e:
print(f"Error processing binlog event: {e}")

def process_binlog_event(self, event):
"""Process binlog event into change event"""

change_event = {
'timestamp': event.timestamp,
'log_pos': event.packet.log_pos,
'table': event.table,
'schema': event.schema,
}

if isinstance(event, WriteRowsEvent):
change_event['operation'] = 'INSERT'
change_event['data'] = event.rows[0]['values']

elif isinstance(event, UpdateRowsEvent):
change_event['operation'] = 'UPDATE'
change_event['before'] = event.rows[0]['before_values']
change_event['after'] = event.rows[0]['after_values']

elif isinstance(event, DeleteRowsEvent):
change_event['operation'] = 'DELETE'
change_event['data'] = event.rows[0]['values']

return change_event

def stop_streaming(self):
"""Stop streaming"""
if self.stream:
self.stream.close()

# Usage example
def handle_mysql_change(event):
"""Handle MySQL change event"""
print(f"MySQL Change: {event['operation']} on {event['schema']}.{event['table']}")
if event['operation'] == 'UPDATE':
print(f"Before: {event['before']}")
print(f"After: {event['after']}")
else:
print(f"Data: {event['data']}")

# Initialize MySQL CDC
mysql_cdc = MySQLCDC({
'host': 'localhost',
'port': 3306,
'user': 'replicator',
'passwd': 'replicator_password',
'db': 'mydb'
})

mysql_cdc.start_streaming(handle_mysql_change)

Oracle CDC Implementation

1. Oracle Configuration

Enable LogMiner

-- Enable supplemental logging
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

-- Enable supplemental logging for specific tables
ALTER TABLE users ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

-- Create LogMiner user
CREATE USER logminer_user IDENTIFIED BY logminer_password;
GRANT CREATE SESSION TO logminer_user;
GRANT SELECT ON V_$DATABASE TO logminer_user;
GRANT FLASHBACK ANY TABLE TO logminer_user;
GRANT SELECT ANY TABLE TO logminer_user;
GRANT LOCK ANY TABLE TO logminer_user;
GRANT CREATE TABLE TO logminer_user;

2. Debezium Oracle Connector

Configuration

{
"name": "oracle-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"database.hostname": "localhost",
"database.port": "1521",
"database.user": "logminer_user",
"database.password": "logminer_password",
"database.dbname": "XE",
"database.server.name": "oracle-server",
"table.include.list": "HR.USERS,HR.ORDERS",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.oracle-server",
"log.mining.strategy": "online_catalog",
"log.mining.continuous.mine": "true",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

SQL Server CDC Implementation

1. SQL Server Configuration

Enable CDC

-- Enable CDC on database
USE MyDatabase;
EXEC sys.sp_cdc_enable_db;

-- Enable CDC on specific tables
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'users',
@role_name = 'cdc_admin';

EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'orders',
@role_name = 'cdc_admin';

Create CDC User

-- Create CDC user
CREATE LOGIN cdc_user WITH PASSWORD = 'cdc_password';
USE MyDatabase;
CREATE USER cdc_user FOR LOGIN cdc_user;

-- Grant CDC permissions
EXEC sp_addrolemember 'db_datareader', 'cdc_user';
EXEC sp_addrolemember 'cdc_admin', 'cdc_user';

2. Debezium SQL Server Connector

Configuration

{
"name": "sqlserver-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "localhost",
"database.port": "1433",
"database.user": "cdc_user",
"database.password": "cdc_password",
"database.dbname": "MyDatabase",
"database.server.name": "sqlserver-server",
"table.include.list": "dbo.users,dbo.orders",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.sqlserver-server",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

MongoDB CDC Implementation

1. MongoDB Configuration

Enable Oplog

// MongoDB replica set configuration
rs.initiate({
_id: "rs0",
members: [{ _id: 0, host: "localhost:27017" }],
});

// Check oplog status
db.oplog.rs.find().limit(1);

2. Debezium MongoDB Connector

Configuration

{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "rs0/localhost:27017",
"mongodb.name": "mongodb-server",
"mongodb.user": "debezium",
"mongodb.password": "debezium_password",
"collection.include.list": "mydb.users,mydb.orders",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mongodb-server",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

CDC Monitoring and Management

1. Connector Health Monitoring

import requests
import json
import time

class CDCConnectorMonitor:
def __init__(self, kafka_connect_url):
self.kafka_connect_url = kafka_connect_url

def get_connector_status(self, connector_name):
"""Get connector status"""
response = requests.get(
f"{self.kafka_connect_url}/connectors/{connector_name}/status"
)
return response.json()

def get_connector_tasks(self, connector_name):
"""Get connector tasks"""
response = requests.get(
f"{self.kafka_connect_url}/connectors/{connector_name}/tasks"
)
return response.json()

def restart_connector(self, connector_name):
"""Restart connector"""
response = requests.post(
f"{self.kafka_connect_url}/connectors/{connector_name}/restart"
)
return response.status_code == 204

def pause_connector(self, connector_name):
"""Pause connector"""
response = requests.put(
f"{self.kafka_connect_url}/connectors/{connector_name}/pause"
)
return response.status_code == 202

def resume_connector(self, connector_name):
"""Resume connector"""
response = requests.put(
f"{self.kafka_connect_url}/connectors/{connector_name}/resume"
)
return response.status_code == 202

def monitor_connector_health(self, connector_name):
"""Monitor connector health"""
while True:
try:
status = self.get_connector_status(connector_name)
connector_state = status['connector']['state']

print(f"Connector {connector_name} state: {connector_state}")

# Check tasks
for task in status['tasks']:
task_id = task['id']
task_state = task['state']
print(f"Task {task_id} state: {task_state}")

if task_state == 'FAILED':
print(f"Task {task_id} failed: {task.get('trace', 'No trace available')}")

time.sleep(30) # Check every 30 seconds

except Exception as e:
print(f"Error monitoring connector: {e}")
time.sleep(60)

# Usage
monitor = CDCConnectorMonitor("http://localhost:8083")
monitor.monitor_connector_health("postgres-connector")

2. Lag Monitoring

from kafka import KafkaConsumer
import json

class CDCLagMonitor:
def __init__(self, kafka_bootstrap_servers):
self.kafka_bootstrap_servers = kafka_bootstrap_servers

def get_consumer_lag(self, topic, consumer_group):
"""Get consumer lag for topic"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.kafka_bootstrap_servers,
group_id=consumer_group,
enable_auto_commit=False
)

# Get partition information
partitions = consumer.partitions_for_topic(topic)
lag_info = {}

for partition in partitions:
# Get high water mark
high_water = consumer.highwater(partition)

# Get committed offset
committed = consumer.committed(partition)

# Calculate lag
lag = high_water - committed if committed else high_water
lag_info[partition] = {
'high_water': high_water,
'committed': committed,
'lag': lag
}

return lag_info

def monitor_lag(self, topics, consumer_groups):
"""Monitor lag for multiple topics and consumer groups"""
while True:
for topic in topics:
for consumer_group in consumer_groups:
try:
lag_info = self.get_consumer_lag(topic, consumer_group)

total_lag = sum(info['lag'] for info in lag_info.values())

print(f"Topic: {topic}, Consumer Group: {consumer_group}")
print(f"Total Lag: {total_lag}")

for partition, info in lag_info.items():
print(f" Partition {partition}: {info['lag']} messages behind")

# Alert if lag is too high
if total_lag > 10000:
print(f"WARNING: High lag detected for {topic} in {consumer_group}")

except Exception as e:
print(f"Error monitoring lag for {topic}/{consumer_group}: {e}")

time.sleep(60) # Check every minute

# Usage
lag_monitor = CDCLagMonitor(['localhost:9092'])
lag_monitor.monitor_lag(
topics=['users', 'orders', 'products'],
consumer_groups=['analytics-consumer', 'cache-consumer']
)

Best Practices

1. Performance Optimization

# CDC performance best practices

# 1. Batch processing
def batch_process_events(events, batch_size=1000):
"""Process CDC events in batches"""

for i in range(0, len(events), batch_size):
batch = events[i:i + batch_size]

# Process batch
process_batch(batch)

# Commit offsets
commit_offsets(batch[-1].offset)

# 2. Parallel processing
import concurrent.futures

def parallel_process_events(events, max_workers=10):
"""Process events in parallel"""

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []

for event in events:
future = executor.submit(process_event, event)
futures.append(future)

# Wait for all to complete
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
except Exception as e:
print(f"Error processing event: {e}")

# 3. Connection pooling
import psycopg2.pool

class DatabaseConnectionPool:
def __init__(self, connection_params, min_conn=5, max_conn=20):
self.pool = psycopg2.pool.ThreadedConnectionPool(
min_conn, max_conn, **connection_params
)

def get_connection(self):
return self.pool.getconn()

def return_connection(self, conn):
self.pool.putconn(conn)

2. Error Handling and Recovery

# Robust error handling for CDC

class CDCEventProcessor:
def __init__(self):
self.retry_count = 3
self.retry_delay = 1

def process_event_with_retry(self, event):
"""Process event with retry mechanism"""

for attempt in range(self.retry_count):
try:
return self.process_event(event)
except TemporaryError as e:
if attempt < self.retry_count - 1:
time.sleep(self.retry_delay * (2 ** attempt))
continue
else:
# Send to dead letter queue
self.send_to_dlq(event, e)
raise
except PermanentError as e:
# Don't retry permanent errors
self.send_to_dlq(event, e)
raise

def send_to_dlq(self, event, error):
"""Send failed event to dead letter queue"""
dlq_message = {
'original_event': event,
'error': str(error),
'timestamp': time.time(),
'retry_count': self.retry_count
}

# Send to DLQ topic
producer.send('cdc-dlq', dlq_message)

3. Data Validation

# CDC data validation

class CDCDataValidator:
def __init__(self):
self.schema_registry = SchemaRegistryClient({
'url': 'http://localhost:8081'
})

def validate_event(self, event):
"""Validate CDC event"""

# Schema validation
if not self.validate_schema(event):
raise ValidationError("Schema validation failed")

# Data validation
if not self.validate_data(event):
raise ValidationError("Data validation failed")

# Business rule validation
if not self.validate_business_rules(event):
raise ValidationError("Business rule validation failed")

return True

def validate_schema(self, event):
"""Validate event schema"""
# Implement schema validation
return True

def validate_data(self, event):
"""Validate event data"""
# Implement data validation
return True

def validate_business_rules(self, event):
"""Validate business rules"""
# Implement business rule validation
return True

Conclusion

Database CDC implementation provides real-time data synchronization across different database systems. Key considerations include:

Implementation Factors

  • Database Type: Choose appropriate CDC method for your database
  • Performance Impact: Minimize impact on source database
  • Data Consistency: Ensure reliable change capture
  • Monitoring: Implement comprehensive monitoring and alerting

Best Practices

  • Use proven tools like Debezium for production environments
  • Implement proper error handling and retry mechanisms
  • Monitor connector health and consumer lag
  • Validate data integrity and schema evolution
  • Optimize for performance with batching and parallel processing

Database CDC enables real-time data integration and is essential for modern data architectures requiring immediate data synchronization across systems.

Resources

Official Resources

Database-specific Resources