Skip to main content

Kafka CDC Integration: Real-time Data Streaming

This guide covers integrating Change Data Capture (CDC) with Apache Kafka to create real-time data streaming pipelines. We'll explore Kafka Connect, Debezium, and custom implementations for building robust CDC systems.

Overview

Kafka CDC Architecture

┌─────────────────────────────────────────────────────────┐
│ Source Database │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PostgreSQL │ │ MySQL │ │ MongoDB │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────┬───────────────────────────────────┘
│ CDC Events
┌─────────────────────▼───────────────────────────────────┐
│ Kafka Connect Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PostgreSQL │ │ MySQL │ │ MongoDB │ │
│ │ Connector │ │ Connector │ │ Connector │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────┬───────────────────────────────────┘
│ Kafka Topics
┌─────────────────────▼───────────────────────────────────┐
│ Apache Kafka Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ users │ │ orders │ │ products │ │
│ │ changes │ │ changes │ │ changes │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────┬───────────────────────────────────┘
│ Event Streams
┌─────────────────────▼───────────────────────────────────┐
│ Downstream Consumers │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Analytics │ │ Cache │ │ Search │ │
│ │ Platform │ │ Layer │ │ Engine │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘

Key Components

  1. Kafka Connect: Framework for connecting Kafka with external systems
  2. Debezium: Platform for change data capture
  3. Schema Registry: Manages schema evolution
  4. Kafka Streams: Real-time stream processing
  5. Consumer Applications: Downstream data processing

Kafka Connect Setup

1. Installation and Configuration

Download and Setup

# Download Kafka Connect
wget https://downloads.apache.org/kafka/2.8.1/kafka_2.13-2.8.1.tgz
tar -xzf kafka_2.13-2.8.1.tgz
cd kafka_2.13-2.8.1

# Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# Start Kafka
bin/kafka-server-start.sh config/server.properties

# Start Kafka Connect (distributed mode)
bin/connect-distributed.sh config/connect-distributed.properties

Connect Configuration

# config/connect-distributed.properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Schema Registry integration
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

# REST API
rest.port=8083
rest.host.name=localhost

# Plugin paths
plugin.path=/opt/kafka/plugins

# Worker configuration
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

2. Debezium Connector Installation

Download Debezium Connectors

# Create plugins directory
mkdir -p /opt/kafka/plugins

# Download Debezium connectors
cd /opt/kafka/plugins
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.9.7.Final/debezium-connector-postgres-1.9.7.Final-plugin.tar.gz
tar -xzf debezium-connector-postgres-1.9.7.Final-plugin.tar.gz

# Download MySQL connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
tar -xzf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz

# Download MongoDB connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mongodb/1.9.7.Final/debezium-connector-mongodb-1.9.7.Final-plugin.tar.gz
tar -xzf debezium-connector-mongodb-1.9.7.Final-plugin.tar.gz

Update Connect Configuration

# Add to connect-distributed.properties
plugin.path=/opt/kafka/plugins/debezium-connector-postgres,/opt/kafka/plugins/debezium-connector-mysql,/opt/kafka/plugins/debezium-connector-mongodb

PostgreSQL CDC with Kafka

1. PostgreSQL Setup

Database Configuration

-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_wal_senders = 10;
ALTER SYSTEM SET max_replication_slots = 10;

-- Restart PostgreSQL
SELECT pg_reload_conf();

-- Create replication user
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'replicator_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;

-- Create publication
CREATE PUBLICATION cdc_publication FOR TABLE users, orders, products;

2. Debezium PostgreSQL Connector

Connector Configuration

{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "replicator",
"database.password": "replicator_password",
"database.dbname": "mydb",
"database.server.name": "postgres-server",
"table.include.list": "public.users,public.orders,public.products",
"publication.name": "cdc_publication",
"slot.name": "debezium_slot",
"plugin.name": "pgoutput",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.postgres-server",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

Deploy Connector

# Deploy PostgreSQL connector
curl -X POST \
http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d @postgres-connector.json

# Check connector status
curl http://localhost:8083/connectors/postgres-connector/status

3. Custom PostgreSQL CDC Implementation

Python Implementation

import psycopg2
import psycopg2.extensions
from kafka import KafkaProducer
import json
import time

class PostgreSQLKafkaCDC:
def __init__(self, postgres_config, kafka_config):
self.postgres_config = postgres_config
self.kafka_config = kafka_config
self.producer = KafkaProducer(
bootstrap_servers=kafka_config['bootstrap_servers'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
self.connection = None

def connect_postgres(self):
"""Connect to PostgreSQL"""
self.connection = psycopg2.connect(**self.postgres_config)
self.connection.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
)

def create_replication_slot(self, slot_name):
"""Create replication slot"""
cursor = self.connection.cursor()

try:
cursor.execute(
f"SELECT pg_create_logical_replication_slot('{slot_name}', 'pgoutput')"
)
print(f"Created replication slot: {slot_name}")
except psycopg2.Error as e:
if "already exists" in str(e):
print(f"Replication slot {slot_name} already exists")
else:
raise e

def start_replication(self, slot_name, publication_name):
"""Start logical replication"""
cursor = self.connection.cursor()

# Start replication
cursor.start_replication(
slot_name=slot_name,
decode=True,
options={
'proto_version': '1',
'publication_names': publication_name
}
)

print("Started logical replication")

def consume_and_publish(self, slot_name, publication_name):
"""Consume changes and publish to Kafka"""

def consume(msg):
try:
# Parse WAL message
change_event = self.parse_wal_message(msg)
if change_event:
# Publish to Kafka
self.publish_to_kafka(change_event)
except Exception as e:
print(f"Error processing message: {e}")

cursor = self.connection.cursor()
cursor.consume_stream(consume)

def parse_wal_message(self, msg):
"""Parse WAL message into change event"""
try:
# Parse the WAL message
data = msg.payload.decode('utf-8')

# Extract change information
change_event = {
'timestamp': time.time(),
'lsn': msg.data_start,
'operation': self.extract_operation(data),
'table': self.extract_table(data),
'schema': self.extract_schema(data),
'data': self.extract_data(data)
}

return change_event

except Exception as e:
print(f"Error parsing WAL message: {e}")
return None

def publish_to_kafka(self, change_event):
"""Publish change event to Kafka"""
topic = f"{change_event['schema']}.{change_event['table']}"

# Add metadata
kafka_message = {
'schema': {
'type': 'struct',
'fields': [
{'field': 'before', 'type': 'struct'},
{'field': 'after', 'type': 'struct'},
{'field': 'source', 'type': 'struct'},
{'field': 'op', 'type': 'string'},
{'field': 'ts_ms', 'type': 'int64'}
]
},
'payload': {
'before': change_event.get('before_data'),
'after': change_event.get('after_data'),
'source': {
'connector': 'postgresql',
'name': 'postgres-server',
'ts_ms': int(change_event['timestamp'] * 1000),
'snapshot': 'false',
'db': 'mydb',
'schema': change_event['schema'],
'table': change_event['table']
},
'op': change_event['operation'],
'ts_ms': int(change_event['timestamp'] * 1000)
}
}

# Send to Kafka
self.producer.send(topic, kafka_message)
print(f"Published change event to topic: {topic}")

def extract_operation(self, data):
"""Extract operation type from WAL data"""
if 'INSERT' in data:
return 'c' # Create
elif 'UPDATE' in data:
return 'u' # Update
elif 'DELETE' in data:
return 'd' # Delete
return 'r' # Read

def extract_table(self, data):
"""Extract table name from WAL data"""
# Implementation depends on WAL format
# This is a simplified example
return "users" # Extract from actual WAL data

def extract_schema(self, data):
"""Extract schema name from WAL data"""
return "public" # Extract from actual WAL data

def extract_data(self, data):
"""Extract data from WAL message"""
# Parse actual data from WAL message
# Implementation depends on WAL format
return {} # Extract from actual WAL data

# Usage
postgres_config = {
'host': 'localhost',
'port': 5432,
'database': 'mydb',
'user': 'replicator',
'password': 'replicator_password'
}

kafka_config = {
'bootstrap_servers': ['localhost:9092']
}

cdc = PostgreSQLKafkaCDC(postgres_config, kafka_config)
cdc.connect_postgres()
cdc.create_replication_slot('custom_cdc_slot')
cdc.start_replication('custom_cdc_slot', 'cdc_publication')
cdc.consume_and_publish('custom_cdc_slot', 'cdc_publication')

Kafka Streams for CDC Processing

1. Real-time Data Processing

Stream Processing Application

// Kafka Streams CDC Processor
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CDCStreamProcessor {

private static final String INPUT_TOPIC = "users";
private static final String OUTPUT_TOPIC = "user-analytics";
private static final String DLQ_TOPIC = "user-changes-dlq";

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cdc-stream-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

// Create input stream
KStream<String, String> inputStream = builder.stream(INPUT_TOPIC);

// Process CDC events
KStream<String, String> processedStream = inputStream
.filter((key, value) -> isValidEvent(value))
.mapValues(value -> processCDCEvent(value))
.filter((key, value) -> value != null);

// Handle different operations
KStream<String, String> createStream = processedStream
.filter((key, value) -> isCreateOperation(value))
.mapValues(value -> handleCreateEvent(value));

KStream<String, String> updateStream = processedStream
.filter((key, value) -> isUpdateOperation(value))
.mapValues(value -> handleUpdateEvent(value));

KStream<String, String> deleteStream = processedStream
.filter((key, value) -> isDeleteOperation(value))
.mapValues(value -> handleDeleteEvent(value));

// Merge streams
KStream<String, String> mergedStream = createStream
.merge(updateStream)
.merge(deleteStream);

// Send to output topic
mergedStream.to(OUTPUT_TOPIC);

// Handle invalid events
inputStream
.filter((key, value) -> !isValidEvent(value))
.to(DLQ_TOPIC);

// Start processing
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

private static boolean isValidEvent(String value) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(value);
return node.has("payload") && node.get("payload").has("op");
} catch (Exception e) {
return false;
}
}

private static String processCDCEvent(String value) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(value);

String operation = node.get("payload").get("op").asText();
JsonNode after = node.get("payload").get("after");

// Add processing timestamp
ObjectNode result = (ObjectNode) node;
result.put("processed_at", System.currentTimeMillis());

return mapper.writeValueAsString(result);

} catch (Exception e) {
System.err.println("Error processing CDC event: " + e.getMessage());
return null;
}
}

private static boolean isCreateOperation(String value) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(value);
return "c".equals(node.get("payload").get("op").asText());
} catch (Exception e) {
return false;
}
}

private static boolean isUpdateOperation(String value) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(value);
return "u".equals(node.get("payload").get("op").asText());
} catch (Exception e) {
return false;
}
}

private static boolean isDeleteOperation(String value) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(value);
return "d".equals(node.get("payload").get("op").asText());
} catch (Exception e) {
return false;
}
}

private static String handleCreateEvent(String value) {
// Handle create event
return value;
}

private static String handleUpdateEvent(String value) {
// Handle update event
return value;
}

private static String handleDeleteEvent(String value) {
// Handle delete event
return value;
}
}

2. Python Kafka Streams Alternative

from kafka import KafkaConsumer, KafkaProducer
import json
import time
from typing import Dict, Any

class PythonCDCStreamProcessor:
def __init__(self, kafka_config):
self.kafka_config = kafka_config
self.consumer = None
self.producer = None

def setup_consumer(self, topics):
"""Setup Kafka consumer"""
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=self.kafka_config['bootstrap_servers'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
group_id='cdc-stream-processor',
auto_offset_reset='latest',
enable_auto_commit=False
)

def setup_producer(self):
"""Setup Kafka producer"""
self.producer = KafkaProducer(
bootstrap_servers=self.kafka_config['bootstrap_servers'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

def process_cdc_events(self):
"""Process CDC events"""
for message in self.consumer:
try:
# Validate event
if not self.is_valid_event(message.value):
self.send_to_dlq(message.value, "Invalid event format")
continue

# Process event
processed_event = self.process_event(message.value)

if processed_event:
# Send to output topic
output_topic = self.get_output_topic(message.topic)
self.producer.send(output_topic, processed_event)

# Commit offset
self.consumer.commit()
else:
self.send_to_dlq(message.value, "Processing failed")

except Exception as e:
print(f"Error processing message: {e}")
self.send_to_dlq(message.value, str(e))

def is_valid_event(self, event: Dict[str, Any]) -> bool:
"""Validate CDC event"""
try:
return (
'payload' in event and
'op' in event['payload'] and
event['payload']['op'] in ['c', 'u', 'd', 'r']
)
except Exception:
return False

def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Process CDC event"""
try:
operation = event['payload']['op']

# Add processing metadata
event['processed_at'] = time.time()
event['processor'] = 'python-cdc-processor'

# Handle different operations
if operation == 'c':
return self.handle_create_event(event)
elif operation == 'u':
return self.handle_update_event(event)
elif operation == 'd':
return self.handle_delete_event(event)
else:
return event

except Exception as e:
print(f"Error processing event: {e}")
return None

def handle_create_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Handle create event"""
# Add create-specific processing
event['event_type'] = 'user_created'
return event

def handle_update_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Handle update event"""
# Add update-specific processing
event['event_type'] = 'user_updated'
return event

def handle_delete_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Handle delete event"""
# Add delete-specific processing
event['event_type'] = 'user_deleted'
return event

def get_output_topic(self, input_topic: str) -> str:
"""Get output topic name"""
return f"{input_topic}-processed"

def send_to_dlq(self, event: Dict[str, Any], error: str):
"""Send failed event to dead letter queue"""
dlq_message = {
'original_event': event,
'error': error,
'timestamp': time.time()
}

self.producer.send('cdc-dlq', dlq_message)
print(f"Sent to DLQ: {error}")

# Usage
kafka_config = {
'bootstrap_servers': ['localhost:9092']
}

processor = PythonCDCStreamProcessor(kafka_config)
processor.setup_consumer(['users', 'orders', 'products'])
processor.setup_producer()
processor.process_cdc_events()

Schema Registry Integration

1. Schema Evolution

Avro Schema Definition

{
"type": "record",
"name": "UserChangeEvent",
"namespace": "com.example.cdc",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name", "type": "string" },
{ "name": "email", "type": "string" },
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
],
"default": null
},
{
"name": "after",
"type": ["null", "User"],
"default": null
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"fields": [
{ "name": "connector", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "ts_ms", "type": "long" },
{ "name": "db", "type": "string" },
{ "name": "schema", "type": "string" },
{ "name": "table", "type": "string" }
]
}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": "long"
}
]
}

Schema Registry Client

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import MessageField, SerializationContext
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
import json

class SchemaRegistryManager:
def __init__(self, schema_registry_url):
self.schema_registry = SchemaRegistryClient({
'url': schema_registry_url
})
self.serializers = {}
self.deserializers = {}

def register_schema(self, subject, schema_str):
"""Register schema in Schema Registry"""
try:
schema = self.schema_registry.register_schema(
subject=subject,
schema=schema_str
)
print(f"Registered schema for subject {subject}: {schema.schema_id}")
return schema
except Exception as e:
print(f"Error registering schema: {e}")
return None

def get_latest_schema(self, subject):
"""Get latest schema for subject"""
try:
schema = self.schema_registry.get_latest_version(subject)
return schema
except Exception as e:
print(f"Error getting latest schema: {e}")
return None

def create_serializer(self, subject):
"""Create Avro serializer for subject"""
if subject not in self.serializers:
schema = self.get_latest_schema(subject)
if schema:
self.serializers[subject] = AvroSerializer(
self.schema_registry,
schema.schema.schema_str
)
return self.serializers.get(subject)

def create_deserializer(self, subject):
"""Create Avro deserializer for subject"""
if subject not in self.deserializers:
schema = self.get_latest_schema(subject)
if schema:
self.deserializers[subject] = AvroDeserializer(
self.schema_registry,
schema.schema.schema_str
)
return self.deserializers.get(subject)

def serialize_message(self, subject, data):
"""Serialize message using Avro"""
serializer = self.create_serializer(subject)
if serializer:
return serializer(data, SerializationContext(subject, MessageField.VALUE))
return None

def deserialize_message(self, subject, data):
"""Deserialize message using Avro"""
deserializer = self.create_deserializer(subject)
if deserializer:
return deserializer(data, SerializationContext(subject, MessageField.VALUE))
return None

# Usage
schema_manager = SchemaRegistryManager('http://localhost:8081')

# Register schema
with open('user_change_event.avsc', 'r') as f:
schema_str = f.read()

schema_manager.register_schema('user-changes-value', schema_str)

# Serialize message
user_data = {
'before': None,
'after': {
'id': 1,
'name': 'John Doe',
'email': 'john@example.com',
'created_at': 1640995200000
},
'source': {
'connector': 'postgresql',
'name': 'postgres-server',
'ts_ms': 1640995200000,
'db': 'mydb',
'schema': 'public',
'table': 'users'
},
'op': 'c',
'ts_ms': 1640995200000
}

serialized_data = schema_manager.serialize_message('user-changes-value', user_data)

Monitoring and Management

1. Kafka Connect Monitoring

import requests
import json
import time
from typing import Dict, List

class KafkaConnectMonitor:
def __init__(self, connect_url):
self.connect_url = connect_url

def get_connectors(self) -> List[str]:
"""Get list of connectors"""
response = requests.get(f"{self.connect_url}/connectors")
return response.json()

def get_connector_status(self, connector_name: str) -> Dict:
"""Get connector status"""
response = requests.get(f"{self.connect_url}/connectors/{connector_name}/status")
return response.json()

def get_connector_config(self, connector_name: str) -> Dict:
"""Get connector configuration"""
response = requests.get(f"{self.connect_url}/connectors/{connector_name}/config")
return response.json()

def restart_connector(self, connector_name: str) -> bool:
"""Restart connector"""
response = requests.post(f"{self.connect_url}/connectors/{connector_name}/restart")
return response.status_code == 204

def pause_connector(self, connector_name: str) -> bool:
"""Pause connector"""
response = requests.put(f"{self.connect_url}/connectors/{connector_name}/pause")
return response.status_code == 202

def resume_connector(self, connector_name: str) -> bool:
"""Resume connector"""
response = requests.put(f"{self.connect_url}/connectors/{connector_name}/resume")
return response.status_code == 202

def monitor_connectors(self):
"""Monitor all connectors"""
while True:
try:
connectors = self.get_connectors()

for connector in connectors:
status = self.get_connector_status(connector)
connector_state = status['connector']['state']

print(f"Connector: {connector}, State: {connector_state}")

# Check tasks
for task in status['tasks']:
task_id = task['id']
task_state = task['state']
print(f" Task {task_id}: {task_state}")

if task_state == 'FAILED':
print(f" Error: {task.get('trace', 'No trace available')}")

time.sleep(30) # Check every 30 seconds

except Exception as e:
print(f"Error monitoring connectors: {e}")
time.sleep(60)

# Usage
monitor = KafkaConnectMonitor("http://localhost:8083")
monitor.monitor_connectors()

2. Consumer Lag Monitoring

from kafka import KafkaConsumer, KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
import time

class ConsumerLagMonitor:
def __init__(self, kafka_bootstrap_servers):
self.kafka_bootstrap_servers = kafka_bootstrap_servers
self.admin_client = KafkaAdminClient(
bootstrap_servers=kafka_bootstrap_servers
)

def get_consumer_lag(self, topic: str, consumer_group: str) -> Dict:
"""Get consumer lag for topic and consumer group"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.kafka_bootstrap_servers,
group_id=consumer_group,
enable_auto_commit=False
)

# Get partition information
partitions = consumer.partitions_for_topic(topic)
lag_info = {}

for partition in partitions:
# Get high water mark
high_water = consumer.highwater(partition)

# Get committed offset
committed = consumer.committed(partition)

# Calculate lag
lag = high_water - committed if committed else high_water
lag_info[partition] = {
'high_water': high_water,
'committed': committed,
'lag': lag
}

return lag_info

def get_topic_metadata(self, topic: str) -> Dict:
"""Get topic metadata"""
metadata = self.admin_client.describe_topics([topic])
return metadata[topic]

def monitor_lag(self, topics: List[str], consumer_groups: List[str]):
"""Monitor lag for multiple topics and consumer groups"""
while True:
for topic in topics:
for consumer_group in consumer_groups:
try:
lag_info = self.get_consumer_lag(topic, consumer_group)

total_lag = sum(info['lag'] for info in lag_info.values())

print(f"Topic: {topic}, Consumer Group: {consumer_group}")
print(f"Total Lag: {total_lag}")

for partition, info in lag_info.items():
print(f" Partition {partition}: {info['lag']} messages behind")

# Alert if lag is too high
if total_lag > 10000:
print(f"WARNING: High lag detected for {topic} in {consumer_group}")

except Exception as e:
print(f"Error monitoring lag for {topic}/{consumer_group}: {e}")

time.sleep(60) # Check every minute

# Usage
lag_monitor = ConsumerLagMonitor(['localhost:9092'])
lag_monitor.monitor_lag(
topics=['users', 'orders', 'products'],
consumer_groups=['analytics-consumer', 'cache-consumer']
)

Best Practices

1. Performance Optimization

# CDC performance best practices

# 1. Batch processing
def batch_process_events(events, batch_size=1000):
"""Process CDC events in batches"""

for i in range(0, len(events), batch_size):
batch = events[i:i + batch_size]

# Process batch
process_batch(batch)

# Commit offsets
commit_offsets(batch[-1].offset)

# 2. Parallel processing
import concurrent.futures

def parallel_process_events(events, max_workers=10):
"""Process events in parallel"""

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []

for event in events:
future = executor.submit(process_event, event)
futures.append(future)

# Wait for all to complete
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
except Exception as e:
print(f"Error processing event: {e}")

# 3. Connection pooling
import psycopg2.pool

class DatabaseConnectionPool:
def __init__(self, connection_params, min_conn=5, max_conn=20):
self.pool = psycopg2.pool.ThreadedConnectionPool(
min_conn, max_conn, **connection_params
)

def get_connection(self):
return self.pool.getconn()

def return_connection(self, conn):
self.pool.putconn(conn)

2. Error Handling and Recovery

# Robust error handling for CDC

class CDCEventProcessor:
def __init__(self):
self.retry_count = 3
self.retry_delay = 1

def process_event_with_retry(self, event):
"""Process event with retry mechanism"""

for attempt in range(self.retry_count):
try:
return self.process_event(event)
except TemporaryError as e:
if attempt < self.retry_count - 1:
time.sleep(self.retry_delay * (2 ** attempt))
continue
else:
# Send to dead letter queue
self.send_to_dlq(event, e)
raise
except PermanentError as e:
# Don't retry permanent errors
self.send_to_dlq(event, e)
raise

def send_to_dlq(self, event, error):
"""Send failed event to dead letter queue"""
dlq_message = {
'original_event': event,
'error': str(error),
'timestamp': time.time(),
'retry_count': self.retry_count
}

# Send to DLQ topic
producer.send('cdc-dlq', dlq_message)

3. Data Validation

# CDC data validation

class CDCDataValidator:
def __init__(self):
self.schema_registry = SchemaRegistryClient({
'url': 'http://localhost:8081'
})

def validate_event(self, event):
"""Validate CDC event"""

# Schema validation
if not self.validate_schema(event):
raise ValidationError("Schema validation failed")

# Data validation
if not self.validate_data(event):
raise ValidationError("Data validation failed")

# Business rule validation
if not self.validate_business_rules(event):
raise ValidationError("Business rule validation failed")

return True

def validate_schema(self, event):
"""Validate event schema"""
# Implement schema validation
return True

def validate_data(self, event):
"""Validate event data"""
# Implement data validation
return True

def validate_business_rules(self, event):
"""Validate business rules"""
# Implement business rule validation
return True

Conclusion

Kafka CDC integration provides a robust foundation for real-time data streaming with:

Key Benefits

  • Real-time Processing: Immediate data change propagation
  • Scalability: Handle high-volume change streams
  • Reliability: Fault-tolerant message delivery
  • Schema Evolution: Support for schema changes over time

Best Use Cases

  • Real-time Analytics: Immediate insights from data changes
  • Data Synchronization: Keep multiple systems in sync
  • Event-driven Architecture: Build reactive systems
  • Microservices Integration: Decouple services with events

When to Choose Kafka CDC

  • Need real-time data processing
  • Building event-driven architectures
  • Multiple consumers for the same data
  • Require high throughput and low latency

Kafka CDC integration transforms traditional batch-oriented data processing into real-time, event-driven architectures that can respond immediately to data changes.

Resources

Official Resources

Learning Resources