CDC Pipeline with Debezium, Kafka and Parseable
Step-by-Step Guide to Building CDC Pipelines with PostgreSQL and Debezium
Table of contents
Change data capture is a process used to track and capture changes (inserts, updates and deletes) in a database in real-time, allowing these changes to be recorded or streamed to downstream systems.
Use Cases of CDC
Data Synchronization: CDC keeps data warehouses and lakes synchronized, preventing discrepancies across systems.
Real-Time Analytics: Facilitates real-time analytics, allowing businesses to make informed decisions based on up-to-date information.
Data Migration: Moves data in real-time hence facilitates zero-downtime database migrations
Auditing and Compliance: By maintaining a log of all database changes, you can meet compliance requirements and perform thorough audits when necessary.
Fraud Detection: Enables fraud detection by monitoring changes and anomalies in data.
Components in a CDC Pipeline
Database: where changes originate such as MySQL, PostgreSQL, MongoDB, MariaDB etc
CDC Tool: tool to track and capture changes from the databases and stream the tracked changes to downstream systems
Streaming Platform: platform to stream the tracked changes between the CDC tool and the target system
Data Sync: target system to store the tracked changes
Query and Analytics: system to use for query and analytics
Debezium: The CDC Tool
Change Tracking: tracks and captures changes from databases
Real-Time Synchronization: enables real-time data synchronization, ensuring data consistency across different systems
Stream using Kafka Connect: streams the tracked changes to Kafka for real-time updates
Multiple Database Support: supports multiple databases such as MySQL and PostgreSQL, enhancing versatility
Kafka: The Streaming Platform
Event Publishing and Subscription: enables seamless publishing and subscribing to the event streams, enhancing real-time data flow
Durable Event Storage: ensures reliable storage of events, allowing for data retention as long as necessary
Real-time and Retrospective Processing: processes stream of events both in real-time and retrospectively, offering flexible data handling
Scalability for Large Environments: supports scalability, making it ideal for large-scale data environments and high-volume event handling
Use Case - Ecommerce - Ordering System
Components Created for the Pipeline
MySQL Tables: stores data for customers and orders table
Audit Log Tables: stores changes/deletes from customers and orders tables
Triggers in MySQL: log changes following updates and deletions in customers and orders tables
Kafka Connectors: facilitates seamless data movement between database and Parseable
Setup all Components
Run the following commands to set up the Docker Compose environment:
sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/docker-compose.yaml
sudo docker compose -f docker-compose.yaml up -d
sudo docker ps
Download the scripts to create the tables and triggers in MySQL databases:
sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/mysql-scripts.sql
cat mysql-scripts.sql
Copy the contents of the file mysql-scripts.sql
, this needs to be executed in the mysql container to create the required tables and triggers
Open a new terminal and execute below commands one by one:
sudo docker exec -it mysql bash
mysql -u root -p'debezium'
Paste the contents of the script file, this will create below tables in the cdcdemo
database:
customers
orders
customers-audit-log
orders-audit-log
This also create below triggers in the same database:
after_customers_update
after_customers_delete
after_orders_update
after_orders_delete
Along with the tables and triggers, the script also grants relevant access to the mysqluser
user.
You can verify the same by running below commands one by one
use cdcdemo;
show tables;
show triggers;
SHOW GRANTS FOR 'mysqluser';
exit; //return to bash
exit; //exit from the container
Now we need to create the source and target connectors in the kafka-connect
service which is running on localhost:8083
. Run below command to create debezium mysql connector and the parseable sink connectors in kafka-connect
Debezium mysql connector captures the changes from all the tables in MySQL DB and publishes the events to different topics in Kafka.
Parseabl sink connectors consumes the events from each of the topics and send them to different streams in Parseable.
sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/debezium-connector-mysql.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-customers.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-orders.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-customers-audit-log.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-orders-audit-log.json \
https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-schema-changes.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @debezium-connector-mysql.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-customers.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-orders.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-customers-audit-log.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-orders-audit-log.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @parseable-connector-schema-changes.json
Run below commands to verify if connector is setup successfully
curl -s http://localhost:8083/connectors/mysql-connector/status | jq .
You can verify the same for all the sink connectors as well.
Generate data in the MySQL tables
You can run below command that uses our data load script to insert/update data in customers and orders table in MySQL DB. You can use your own script according to your use-case.
sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/insert-update-data.sh
sudo docker cp insert-update-data.sh mysql:insert-update-data.sh
sudo docker exec -it mysql bash
chmod +x insert-update-data.sh
./insert-update-data.sh
You can use below command (in a different terminal) that uses our delete data script to delete random data from orders and customers table in MySQL DB.
sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/delete-data.sh
sudo docker cp delete-data.sh mysql:delete-data.sh
sudo docker exec -it mysql bash
chmod +x delete-data.sh
./delete-data.sh
Parseable
Parseable is a powerful cloud native log analytics platform built for high speed log ingestion, query and analysis.
Developer-Centric Design: simple setup, leverage cloud’s elasticity scale with log volume growth
SQL-Based Query Engine: built in console offering powerful SQL based querying and analytics capabilities for streamlined and efficient data analysis
Native S3 Support: native S3 interface, providing cost-effective and scalable data storage solution
Open Data Format: utilizes parquet format for deep analysis, ensuring flexibility and eliminating vendor lock-in
Open Telemetry Support: first class open telemetry support, facilitating seamless data collection and monitoring
Pluggable Integrations: Extremely pluggable with integrations with all popular log agents, Grafana, Kafka and more
With the above setup completed, Parseable console can be accessed at http://localhost:8000 using username as admin
and password as admin
.
Log Explore and Query
Once logged in, you will see 5 streams on the landing page. You can navigate to each of the stream and see the log event for all inserts/updates/deletes in all the tables used in the MySQL DB. You can apply filters and search for a particular log event to view the changes, save the queries and do lot more to enhance your debugging experience with the events.
Dashboards for Analytics
Parseable has in built dashboard features which allow you to visualise the log data and perform better analytics. For this use-case, you can use Parseable dashboards to
perform the real-time analytics
audit logs for all the record level changes in the tables along with the schema changes happening in the database
fraud detection
You can import the dashboard we created for this demostration for your reference. First, download the reference dashboard by running below command -
sudo wget 'https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/Audit Logs Dashboard.json' \
'https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/Ordering Dashboard.json'
In order to do so, navigate to the Dashboards
tab from the left sidebar and you will find an option to Import Dashboard
. Browse to the path where you downloaded the dasboard json files and import the dashboards.
Conclusion
In this tutorial, you are able to successfully build a CDC pipeline that captures changes from MySQL, streams them through Kafka, and ingests them into Parseable. This setup provides real-time data synchronization, opening up a world of possibilities for data analysis and system integration.