Build Real-Time Data Lake with Amazon MSK Connect & Iceberg
By Braincuber Team
Published on February 4, 2026
At ShopStream Inc., the analytics team was running batch ETL jobs every 6 hours to sync order data from their MySQL database to their data lake. By the time marketing could analyze customer behavior, the data was already stale. Flash sales ended before they could adjust recommendations. Inventory discrepancies piled up because warehouse data lagged behind reality.
Their solution? Amazon MSK Connect with Iceberg Kafka Connect—a fully managed streaming pipeline that captures every database change in real-time and writes directly to Apache Iceberg tables. No more batch jobs. No more stale data. This guide walks you through building the same architecture, from capturing MySQL CDC events to querying fresh data in Amazon Athena within seconds of a transaction.
What You'll Build:
- Real-time CDC pipeline from MySQL to Apache Iceberg
- Single-table and multi-table synchronization
- Schema evolution support for changing source tables
- Exactly-once delivery semantics
Architecture Overview
The solution uses three AWS services working together:
Amazon RDS MySQL
Source database with binary logging enabled for CDC capture
Amazon MSK Connect
Managed Kafka Connect with Debezium source and Iceberg sink
Apache Iceberg on S3
Target data lake with ACID transactions and schema evolution
Data Flow
# Real-Time CDC Pipeline Architecture
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Amazon RDS │ │ Amazon MSK │ │ Amazon S3 │
│ MySQL │────▶│ Cluster │────▶│ (Iceberg) │
│ (Binary Log) │ │ (Kafka) │ │ │
└─────────────────┘ └────────┬────────┘ └────────┬────────┘
│ │
┌────────────┴────────────┐ │
│ MSK Connect │ │
│ ┌─────────┐ ┌────────┐ │ │
│ │Debezium │ │Iceberg │ │ │
│ │ Source │ │ Sink │ │ │
│ └─────────┘ └────────┘ │ │
└─────────────────────────┘ │
│
┌────────────────────────────────────┘
▼
┌─────────────────┐ ┌─────────────────┐
│ AWS Glue │ │ Amazon Athena │
│ Data Catalog │────▶│ (Query) │
└─────────────────┘ └─────────────────┘
Prerequisites
Before starting, ensure you have these components in place:
| Component | Requirements | Notes |
|---|---|---|
| Amazon RDS MySQL | Binary logging enabled | Parameter: binlog_format = ROW |
| Amazon MSK Cluster | Provisioned or Serverless | Same VPC as RDS |
| Amazon S3 Bucket | For plugins and data | Same region as MSK |
| Build Environment | Java 11+, Gradle | EC2 instance or local machine |
Step 1: Build Iceberg Kafka Connect Plugin
The Iceberg Kafka Connect plugin needs to be built from source. This creates a ZIP archive ready for MSK Connect.
# Clone the Apache Iceberg repository
git clone https://github.com/apache/iceberg.git
cd iceberg/
# Build the Kafka Connect plugin (skip tests for faster build)
./gradlew -x test -x integrationTest clean build
# The ZIP archive is saved here:
# ./kafka-connect/kafka-connect-runtime/build/distributions/
# Upload to S3 for MSK Connect
aws s3 cp ./kafka-connect/kafka-connect-runtime/build/distributions/iceberg-kafka-connect-*.zip \
s3://your-bucket/plugins/iceberg-kafka-connect.zip
Tip: The build takes 15-20 minutes. Use an EC2 instance with at least 20GB disk space for faster builds.
Step 2: Create Custom Plugins in MSK Connect
You need two custom plugins: Debezium MySQL Connector (source) and Iceberg Kafka Connect (sink).
- Download Debezium MySQL Connector: Get the latest version from the Debezium releases page
- Upload both plugins to S3: Place the Debezium and Iceberg connector ZIPs in your plugins bucket
- Create Debezium Plugin: In MSK Console → MSK Connect → Custom Plugins → Create
- Create Iceberg Plugin: Repeat for the Iceberg connector
Step 3: Configure Worker Settings
Create a worker configuration that enables automatic topic creation and proper data conversion:
# Worker Configuration for MSK Connect
# Data converters
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Enable automatic topic creation
topic.creation.enable=true
# Default topic settings for Debezium CDC
topic.creation.default.replication.factor=3
topic.creation.default.partitions=1
topic.creation.default.cleanup.policy=delete
Step 4: Create Debezium Source Connector
The Debezium connector captures changes from MySQL binary logs and publishes them to Kafka topics.
# Debezium MySQL Source Connector Configuration
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
# MySQL connection settings
database.hostname=your-rds-endpoint.region.rds.amazonaws.com
database.port=3306
database.user=debezium_user
database.password=your_secure_password
database.server.id=100001
database.server.name=shopstream
# Tables to capture (supports regex)
database.include.list=ecommerce
table.include.list=ecommerce.orders,ecommerce.customers,ecommerce.products
# Topic settings
topic.prefix=shopstream
include.schema.changes=false
# Topic creation defaults
topic.creation.default.partitions=3
topic.creation.default.replication.factor=3
# Snapshot mode (initial = full snapshot, then CDC)
snapshot.mode=initial
Security: Create a dedicated MySQL user with only replication privileges. Never use your admin credentials for CDC.
Step 5: Create Iceberg Sink Connector (Single Table)
For simpler use cases, configure a connector that syncs one Kafka topic to one Iceberg table:
# Iceberg Sink Connector - Single Table Mode
connector.class=org.apache.iceberg.connect.IcebergSinkConnector
tasks.max=1
# Source topic(s)
topics=shopstream.ecommerce.orders
# Target Iceberg table
iceberg.tables=ecommerce.orders
# Catalog configuration (AWS Glue)
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.warehouse=s3://your-bucket/iceberg-warehouse/
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.client.region=us-east-1
# Schema evolution settings
iceberg.tables.auto-create-enabled=true
iceberg.tables.evolve-schema-enabled=true
# Commit interval (ms) - how often to commit to Iceberg
iceberg.control.commit.interval-ms=120000
# Debezium transform (extracts payload from CDC events)
transforms=debezium
transforms.debezium.type=org.apache.iceberg.connect.transforms.DebeziumTransform
# Data converters
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Control topic for offset tracking
iceberg.control.topic=iceberg-control-orders
Step 6: Create Iceberg Sink Connector (Multi-Table)
For enterprise scenarios, route multiple source tables through a single connector with dynamic table routing:
# Iceberg Sink Connector - Multi-Table Mode
connector.class=org.apache.iceberg.connect.IcebergSinkConnector
tasks.max=2
# Multiple source topics
topics=shopstream.ecommerce.orders,shopstream.ecommerce.customers,shopstream.ecommerce.products
# Catalog configuration
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.warehouse=s3://your-bucket/iceberg-warehouse/
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.client.region=us-east-1
# Auto table creation and schema evolution
iceberg.tables.auto-create-enabled=true
iceberg.tables.evolve-schema-enabled=true
# CRITICAL: Dynamic routing settings
iceberg.tables.route-field=_cdc.source
iceberg.tables.dynamic-enabled=true
# Commit settings
iceberg.control.commit.interval-ms=120000
# Debezium transform
transforms=debezium
transforms.debezium.type=org.apache.iceberg.connect.transforms.DebeziumTransform
# Data converters
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Control topic
iceberg.control.topic=iceberg-control-multi
Key Multi-Table Parameters:
-
iceberg.tables.route-field: Specifies which CDC metadata field determines the target table (use
_cdc.sourcefor Debezium) -
iceberg.tables.dynamic-enabled: Must be
truewhen not specifying target tables explicitly
Step 7: Query Data in Amazon Athena
Once the connectors are running, Iceberg tables are automatically registered in AWS Glue Data Catalog and queryable via Athena:
-- Query the latest orders from Iceberg table
SELECT
order_id,
customer_id,
order_total,
order_status,
created_at,
_cdc.op as cdc_operation -- CDC metadata: c=create, u=update, d=delete
FROM ecommerce.orders
WHERE created_at >= current_date - interval '1' day
ORDER BY created_at DESC
LIMIT 100;
-- Track order status changes using time travel
SELECT * FROM ecommerce.orders FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 10:00:00'
WHERE order_id = 'ORD-12345';
-- View table history and snapshots
SELECT * FROM "ecommerce"."orders$snapshots"
ORDER BY committed_at DESC
LIMIT 10;
Iceberg Table Compaction
Real-time ingestion creates many small files. Regular compaction improves query performance:
# AWS Glue compaction job (Python)
import sys
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Run compaction on Iceberg table
spark.sql("""
CALL glue_catalog.system.rewrite_data_files(
table => 'ecommerce.orders',
options => map('target-file-size-bytes', '134217728')
)
""")
# Remove old snapshots (keep last 5 days)
spark.sql("""
CALL glue_catalog.system.expire_snapshots(
table => 'ecommerce.orders',
older_than => TIMESTAMP '2024-01-10 00:00:00'
)
""")
Monitoring and Troubleshooting
Healthy Connector
- Status: RUNNING
- Task count matches configuration
- Lag metrics stable/decreasing
- No error messages in CloudWatch
Common Issues
- Connection refused: Check security groups
- Schema not found: Pre-create Glue database
- Permission denied: Update IAM role
- Topic not created: Enable auto-creation
Clean Up Resources
When you're done testing, delete resources in this order to avoid dependency errors:
- Delete MSK Connect connectors (Iceberg sink first, then Debezium source)
- Delete custom plugins from MSK Connect
- Delete worker configurations
- Delete MSK cluster (if created for this demo)
- Delete S3 objects and bucket
- Delete IAM roles created for the connectors
- Delete Glue database and tables (optional—keep for analysis)
Frequently Asked Questions
iceberg.tables.evolve-schema-enabled=true, the connector automatically adds new columns to Iceberg tables when they appear in the source. Column drops and type changes require manual intervention. Iceberg's column mapping feature handles field renaming.
_cdc.op = 'd'. The default append mode preserves all events including deletes. For real deletion in Iceberg, implement downstream processing that filters _cdc.op != 'd' in views, or use Iceberg's merge-on-read for equality deletes with additional configuration.
Conclusion
Building a real-time data lake with Amazon MSK Connect and Iceberg Kafka Connect eliminates the batch ETL bottleneck. Your analytics teams get fresh data within minutes of transactions, schema changes propagate automatically, and you maintain exactly-once semantics throughout the pipeline.
This architecture scales from a single table proof-of-concept to enterprise-wide CDC synchronization. Start with one connector for your most critical table, validate the data quality, then expand using multi-table routing. The combination of Debezium's robust CDC capture, MSK Connect's managed infrastructure, and Iceberg's ACID guarantees makes this a production-ready solution for any organization needing real-time analytics capabilities.
Need Help Building Your Real-Time Data Lake?
Our AWS certified data engineers specialize in streaming architectures. We can help you design CDC pipelines, optimize Iceberg tables for your query patterns, and implement production-grade monitoring across your data lake.
