CDC Pipeline with Debezium, Kafka and Parseable

CDC Pipeline with Debezium, Kafka and Parseable

Step-by-Step Guide to Building CDC Pipelines with PostgreSQL and Debezium

Change data capture is a process used to track and capture changes (inserts, updates and deletes) in a database in real-time, allowing these changes to be recorded or streamed to downstream systems.

cdc-pipeline-overview

Use Cases of CDC

  • Data Synchronization: CDC keeps data warehouses and lakes synchronized, preventing discrepancies across systems.

  • Real-Time Analytics: Facilitates real-time analytics, allowing businesses to make informed decisions based on up-to-date information.

  • Data Migration: Moves data in real-time hence facilitates zero-downtime database migrations

  • Auditing and Compliance: By maintaining a log of all database changes, you can meet compliance requirements and perform thorough audits when necessary.

  • Fraud Detection: Enables fraud detection by monitoring changes and anomalies in data.

Components in a CDC Pipeline

  • Database: where changes originate such as MySQL, PostgreSQL, MongoDB, MariaDB etc

  • CDC Tool: tool to track and capture changes from the databases and stream the tracked changes to downstream systems

  • Streaming Platform: platform to stream the tracked changes between the CDC tool and the target system

  • Data Sync: target system to store the tracked changes

  • Query and Analytics: system to use for query and analytics

Debezium: The CDC Tool

  • Change Tracking: tracks and captures changes from databases

  • Real-Time Synchronization: enables real-time data synchronization, ensuring data consistency across different systems

  • Stream using Kafka Connect: streams the tracked changes to Kafka for real-time updates

  • Multiple Database Support: supports multiple databases such as MySQL and PostgreSQL, enhancing versatility

Kafka: The Streaming Platform

  • Event Publishing and Subscription: enables seamless publishing and subscribing to the event streams, enhancing real-time data flow

  • Durable Event Storage: ensures reliable storage of events, allowing for data retention as long as necessary

  • Real-time and Retrospective Processing: processes stream of events both in real-time and retrospectively, offering flexible data handling

  • Scalability for Large Environments: supports scalability, making it ideal for large-scale data environments and high-volume event handling

Use Case - Ecommerce - Ordering System

use-case-ordering-system

Components Created for the Pipeline

  • MySQL Tables: stores data for customers and orders table

  • Audit Log Tables: stores changes/deletes from customers and orders tables

  • Triggers in MySQL: log changes following updates and deletions in customers and orders tables

  • Kafka Connectors: facilitates seamless data movement between database and Parseable

Setup all Components

Run the following commands to set up the Docker Compose environment:

sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/docker-compose.yaml
sudo docker compose -f docker-compose.yaml up -d
sudo docker ps

Download the scripts to create the tables and triggers in MySQL databases:

sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/mysql-scripts.sql
cat mysql-scripts.sql

Copy the contents of the file mysql-scripts.sql, this needs to be executed in the mysql container to create the required tables and triggers

Open a new terminal and execute below commands one by one:

sudo docker exec -it mysql bash
mysql -u root -p'debezium'

Paste the contents of the script file, this will create below tables in the cdcdemo database:

  • customers

  • orders

  • customers-audit-log

  • orders-audit-log

This also create below triggers in the same database:

  • after_customers_update

  • after_customers_delete

  • after_orders_update

  • after_orders_delete

Along with the tables and triggers, the script also grants relevant access to the mysqluser user.

You can verify the same by running below commands one by one

use cdcdemo;
show tables; 
show triggers;
SHOW GRANTS FOR 'mysqluser';
exit; //return to bash
exit; //exit from the container

Now we need to create the source and target connectors in the kafka-connect service which is running on localhost:8083. Run below command to create debezium mysql connector and the parseable sink connectors in kafka-connect

Debezium mysql connector captures the changes from all the tables in MySQL DB and publishes the events to different topics in Kafka.

Parseabl sink connectors consumes the events from each of the topics and send them to different streams in Parseable.

sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/debezium-connector-mysql.json \
  https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-customers.json \
  https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-orders.json \
  https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-customers-audit-log.json \
  https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-orders-audit-log.json \
  https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/parseable-connector-schema-changes.json

curl -X POST http://localhost:8083/connectors \
     -H "Content-Type: application/json" \
     -d @debezium-connector-mysql.json

curl -X POST http://localhost:8083/connectors \
     -H "Content-Type: application/json" \
     -d @parseable-connector-customers.json

curl -X POST http://localhost:8083/connectors \
     -H "Content-Type: application/json" \
     -d @parseable-connector-orders.json

curl -X POST http://localhost:8083/connectors \
     -H "Content-Type: application/json" \
     -d @parseable-connector-customers-audit-log.json

curl -X POST http://localhost:8083/connectors \
     -H "Content-Type: application/json" \
     -d @parseable-connector-orders-audit-log.json

curl -X POST http://localhost:8083/connectors \
     -H "Content-Type: application/json" \
     -d @parseable-connector-schema-changes.json

Run below commands to verify if connector is setup successfully

curl -s http://localhost:8083/connectors/mysql-connector/status | jq .

You can verify the same for all the sink connectors as well.

Generate data in the MySQL tables

You can run below command that uses our data load script to insert/update data in customers and orders table in MySQL DB. You can use your own script according to your use-case.

sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/insert-update-data.sh
sudo docker cp insert-update-data.sh mysql:insert-update-data.sh
sudo docker exec -it mysql bash
chmod +x insert-update-data.sh
./insert-update-data.sh

You can use below command (in a different terminal) that uses our delete data script to delete random data from orders and customers table in MySQL DB.

sudo wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/delete-data.sh
sudo docker cp delete-data.sh mysql:delete-data.sh
sudo docker exec -it mysql bash
chmod +x delete-data.sh
./delete-data.sh

Parseable

Parseable is a powerful cloud native log analytics platform built for high speed log ingestion, query and analysis.

  • Developer-Centric Design: simple setup, leverage cloud’s elasticity scale with log volume growth

  • SQL-Based Query Engine: built in console offering powerful SQL based querying and analytics capabilities for streamlined and efficient data analysis

  • Native S3 Support: native S3 interface, providing cost-effective and scalable data storage solution

  • Open Data Format: utilizes parquet format for deep analysis, ensuring flexibility and eliminating vendor lock-in

  • Open Telemetry Support: first class open telemetry support, facilitating seamless data collection and monitoring

  • Pluggable Integrations: Extremely pluggable with integrations with all popular log agents, Grafana, Kafka and more

parseable-architecture.jpg

With the above setup completed, Parseable console can be accessed at http://localhost:8000 using username as admin and password as admin.

Log Explore and Query

Once logged in, you will see 5 streams on the landing page. You can navigate to each of the stream and see the log event for all inserts/updates/deletes in all the tables used in the MySQL DB. You can apply filters and search for a particular log event to view the changes, save the queries and do lot more to enhance your debugging experience with the events.

Dashboards for Analytics

Parseable has in built dashboard features which allow you to visualise the log data and perform better analytics. For this use-case, you can use Parseable dashboards to

  • perform the real-time analytics

  • audit logs for all the record level changes in the tables along with the schema changes happening in the database

  • fraud detection

You can import the dashboard we created for this demostration for your reference. First, download the reference dashboard by running below command -

sudo wget 'https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/Audit Logs Dashboard.json' \
  'https://raw.githubusercontent.com/parseablehq/blog-samples/main/debezium-kafka-cdc-pipeline/Ordering Dashboard.json'

In order to do so, navigate to the Dashboards tab from the left sidebar and you will find an option to Import Dashboard. Browse to the path where you downloaded the dasboard json files and import the dashboards.

Conclusion

In this tutorial, you are able to successfully build a CDC pipeline that captures changes from MySQL, streams them through Kafka, and ingests them into Parseable. This setup provides real-time data synchronization, opening up a world of possibilities for data analysis and system integration.