Change Data Capture using Airbyte (MySQL-to-Kafka)

Caesario Kisty
15 min readDec 18, 2022

--

A streaming data pipeline allows us to process and analyze data in real time. There are a variety of sources of this type of data, including sensors, financial markets, social media, and web servers. As I explained in my last article, a streaming data pipeline is required to make timely decisions. This is in contrast to batch processing, where data is processed in large chunks at regular intervals. However, I will elaborate more on batch processing in a future article.

Next, we have a question: What is the appropriate way to ingest data from an operational database? In other words, data change history in a database, such as inserted, updated, or deleted, can be collected. Why should it be? I will give a brief explanation of OLTP and OLAP. Essentially, they are different types of database systems. Online Transaction Processing (OLTP) is a type of database system that is designed to support a high volume of short, transactional queries. In other words, an OLTP system is optimized for inserting, updating, and deleting large amounts of data in real time. On the other hand, Online Analytical Processing (OLAP) is a type of database system that is designed to support complex analytical queries. These queries are typically more complex than the short, transactional queries that are common in OLTP systems. The process often involves aggregating and summarizing data, as well as performing calculations and other tasks.

To address the above question, this article aims to build a data pipeline between OLTP (Cloud SQL) and OLAP (BigQuery). The data pipeline will be responsible for collecting data changes in the database and storing them in the data warehouse as historical data. As an example, I have a simple digital wallet transaction. Users can deposit, withdraw, and transfer money to their or other users’ accounts. All transactions are stored in a database. In this case, I use Google Cloud SQL with MySQL. The following commands create a database and table, insert sample data, and run transaction queries.

Save the following SQL query in sample_digital_wallet.sql

DROP DATABASE IF EXISTS digital_wallet;
CREATE DATABASE IF NOT EXISTS digital_wallet;
USE digital_wallet;

CREATE TABLE users (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
balanced DECIMAL(10,2) NOT NULL,
region VARCHAR(255) NOT NULL
);

CREATE TABLE transactions (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
user_id INTEGER NOT NULL,
user_destination INTEGER NOT NULL,
transaction_type VARCHAR(255) NOT NULL,
amount DECIMAL(10,2) NOT NULL,
amount_change DECIMAL(10,2) NOT NULL,
created_at DATETIME NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
);

INSERT INTO users (name, email, balanced, region)
VALUES ('John Doe', 'johndoe@example.com', 100.00, 'North America');

INSERT INTO users (name, email, balanced, region)
VALUES ('Jane Smith', 'janesmith@example.com', 50.00, 'Europe');

INSERT INTO users (name, email, balanced, region)
VALUES ('Bob Johnson', 'bobjohnson@example.com', 75.00, 'Asia');

INSERT INTO users (name, email, balanced, region)
VALUES ('Sally Davis', 'sallydavis@example.com', 25.00, 'South America');

INSERT INTO users (name, email, balanced, region)
VALUES ('Michael Brown', 'michaelbrown@example.com', 150.00, 'Africa');

INSERT INTO users (name, email, balanced, region)
VALUES ('Emily Thompson', 'emilythompson@example.com', 200.00, 'Australia');

INSERT INTO users (name, email, balanced, region)
VALUES ('David Moore', 'davidmoore@example.com', 100.00, 'North America');

INSERT INTO users (name, email, balanced, region)
VALUES ('Jessica Williams', 'jessicawilliams@example.com', 75.00, 'Europe');

INSERT INTO users (name, email, balanced, region)
VALUES ('William Jones', 'williamjones@example.com', 50.00, 'Asia');

INSERT INTO users (name, email, balanced, region)
VALUES ('Ashley Taylor', 'ashleytaylor@example.com', 25.00, 'South America');

Preparing Google Cloud SQL

Google Cloud SQL is a cloud-based database service that allows us to easily create and manage our relational databases. By using Cloud SQL, we can set up, maintain, manage, and administer our databases using a simple web-based interface, and choose between MySQL or PostgreSQL as our database engine. Thus, we don’t need to worry about the underlying infrastructure, just build and deploy our applications.

There is no doubt that cloud services at the moment are utterly insane. My first service in this article, Cloud SQL, requires only a few configurations before I can start using it. The first thing to do is, of course, log in to my Google Cloud Console Account. Navigate to the Cloud SQL page and create a SQL instance. For my instance, I only set up the name “mysql-1”, generated a password for root access, and selected the region. The rest of it is set to default options. But if you plan to build for production, I recommend you get to know every component. We should consider the cost implications of using those services.

Hence, here is my Cloud SQL instance with MySQL.

Next, I want to access the MySQL database from my Compute Engine. First, I install the MySQL Client by using the following command:

sudo apt-get install mysql-client

Afterward, I visited the Cloud SQL Console, then navigated to “Connections”. This step aims to build a connection configuration so that my Compute Engine instance can access my Cloud SQL instance. It just puts the External IP address of a couple of my Compute Engine instances (vm-1 and vm-2).

As soon as the connection is configured, I will verify it from my Compute Engine terminal. Using the following command.

mysql -h {Cloud SQL Public IP Address} -u root -p

Finally, I execute the query from this file “sample_digital_wallet.sql”, by using the following command.

mysql -h {Cloud SQL Public IP Address} -u root -p < sample_digital_wallet.sql

Then voila, the data is ready to use.

As part of this database preparation, I also created a Python program that simulates data transactions. The three programs are: ‘transfer.py’ (which simulates transfers between users and automatically updates their balances), ‘deposit.py’ (which deposits money into users’ balances), and ‘withdraw.py’ (which withdraws money from users’ balances). These are the following codes.

transfer.py

import mysql.connector
import sys

user_id = int(sys.argv[1])
user_destination = int(sys.argv[2])
amount = float(sys.argv[3])

cnx = mysql.connector.connect(user='root', password='PUT_ROOT_PASSWORD',
host='PUT_CLOUDSQL_PUBLIC_IP', database='digital_wallet')
cursor = cnx.cursor()

query = """
INSERT INTO transactions (user_id, user_destination, transaction_type, amount, amount_change, created_at)
VALUES (%s, %s, 'transfer', %s, -%s, CURRENT_TIMESTAMP)
"""
values = (user_id, user_destination, amount, amount)
cursor.execute(query, values)

query = """
UPDATE users
SET balanced = balanced - %s
WHERE id = %s
"""
values = (amount, user_id)
cursor.execute(query, values)

query = """
UPDATE users
SET balanced = balanced + %s
WHERE id = %s
"""
values = (amount, user_destination)
cursor.execute(query, values)

cnx.commit()

cnx.close()

deposit.py

import mysql.connector
import sys

user_id = int(sys.argv[1])
amount = float(sys.argv[2])

cnx = mysql.connector.connect(user='root', password='PUT_ROOT_PASSWORD',
host='PUT_CLOUDSQL_PUBLIC_IP', database='digital_wallet')
cursor = cnx.cursor()

query = """
INSERT INTO transactions (user_id, user_destination, transaction_type, amount, amount_change, created_at)
VALUES (%s, %s, 'deposit', %s, %s, CURRENT_TIMESTAMP)
"""
values = (user_id, user_id, amount, amount)
cursor.execute(query, values)

query = """
UPDATE users
SET balanced = balanced + %s
WHERE id = %s
"""
values = (amount, user_id)
cursor.execute(query, values)

cnx.commit()

cnx.close()

withdraw.py

import mysql.connector
import sys

user_id = int(sys.argv[1])
amount = float(sys.argv[2])

cnx = mysql.connector.connect(user='root', password='PUT_ROOT_PASSWORD',
host='PUT_CLOUDSQL_PUBLIC_IP', database='digital_wallet')
cursor = cnx.cursor()

query = """
INSERT INTO transactions (user_id, user_destination, transaction_type, amount, amount_change, created_at)
VALUES (%s, %s, 'withdraw', %s, -%s, CURRENT_TIMESTAMP)
"""
values = (user_id, user_id, amount, amount)
cursor.execute(query, values)

query = """
UPDATE users
SET balanced = balanced - %s
WHERE id = %s
"""
values = (amount, user_id)
cursor.execute(query, values)

cnx.commit()

cnx.close()

The last thing to do in this section is to create a dedicated user in the database. This user has a set of permissions to perform the CDC replication process.

CREATE USER 'airbyte'@'%' IDENTIFIED BY '{password}';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'airbyte'@'%';

Installing Apache Kafka

To explain how Apache Kafka works, let’s imagine a group of friends who want to share messages. To do this, they decide to use a message board that they can all access. Whenever someone has a message to share, they will write it on the message board for everyone else to see. In this analogy, the message board is like a Kafka topic. It is a shared space where messages can be published and read by anyone who has access. Friends are like Kafka producers, which are applications or services that publish messages to Kafka topics. And the other friends are like Kafka consumers, which are applications or services that read messages from the Kafka topic. In this simple example, a message board is a simple and reliable way for friends to share messages. Similarly, Apache Kafka provides a fast, scalable, and durable way for applications and services to share data and communicate with each other in real time.

To install it, download the latest version of Apache Kafka here or run the following command.

wget https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz

Afterward, I extract it on my Compute Engine instance.

tar -xzf kafka_2.13-3.3.1.tgz
mv kafka_2.13-3.3.1 kafka
cd kafka

In my previous article, I explained how to run Zookeeper and Kafka in the background on my Compute Engine instance. To be more specific, I installed it on a virtual machine named ‘vm-1'. Just make sure Kafka Server is already running at this point.

Installing Airbyte

I have already set up Google Cloud SQL as a data source and Apache Kafka as a message broker. Next, I need a tool between them to ingest every update in my database and then send it to the Kafka topic. To capture every change in the ‘digital_wallet’ database, I use Change Data Capture (CDC). Airbyte can handle that CDC for me.

Airbyte is an open-source data integration platform that enables us to easily connect, extract, and transform data from various sources. With Airbyte, we can quickly set up integrations to move data between databases, SaaS applications, and other data sources. Think of Airbyte’s Change Data Capture (CDC) feature as a news reporter. Just like a news reporter is constantly on the lookout for any updates or changes in a particular story, the CDC continuously checks our data sources for updates and changes. CDC works by tracking changes in our data sources and capturing those changes in real time. As soon as CDC receives a new update or change in our data, it will rapidly send it to our desired destination. This is similar to how a news reporter would quickly gather information about a breaking news story and send it out to their audience. In short, CDC allows us to stay on top of any changes or updates to our data sources, just like a news reporter stays on top of the latest news developments. So, it helps us to keep our data pipeline running smoothly and efficiently.

For this Airbyte, I use the Compute Engine instance which is named ‘vm-2’. To install it, I need Docker installed on ‘vm-2’. To make things simpler, here’s how to install Docker. After Docker is installed, I get it started by running the following command.

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker compose up -d

At this point, the Airbyte web app is ready to be accessed on port 8000. However, for security reasons, I keep Airbyte inaccessible to the public. Thus, I use SSH port forwarding to connect from my local machine to the ‘vm-2’ instance. I guess there are two ways to build an SSH tunnel to ‘vm-2’, either via a Service Account or directly with Google Cloud CLI. In this case, I would like to use the second option. Since my local machine (or my laptop) runs Windows, I installed the Google Cloud CLI for Windows. Here are more details for the installation process. For those of you with other operating systems, such as Linux, macOS, CentOS, etc., they also provide installation guides for those as well.

To make sure my Google Cloud account is used for the gcloud, I run the ‘gcloud init’ command after the installation is finished. Afterward, I perform SSH port forwarding by running the following command.

gcloud beta compute ssh $INSTANCE_NAME -- -L 8000:localhost:8000

Lastly, I just visited ‘http://localhost:8000' in my web browser. Then, Airbyte asked me to provide a username and password to log in. These username and password are configured in the Airbyte ‘.env’ file. Airbyte’s web application is shown below.

Preparing Change Data Capture from MySQL to Apache Kafka

In this section, I will configure 3 things in Airbyte. There is a source, a destination, and a connection. A “source” refers to a data source that I want to extract data from, while a “destination” refers to a data sink that I want to load data into. Airbyte provides a library of pre-built connectors for various data sources and destinations that we can use to build integrations. These connectors allow us to easily connect to a data source or destination and extract or load data without the need to write custom code. The documentation for Airbyte shows available sources and destinations.

As a source, I use MySQL on Google Cloud SQL. The data is now ready to ingest, as I have shown above. To set up the source, click on “Sources” in the navigation and then click the “New source” button. After that, the Airbyte application will ask me to choose a source.

The next step is to configure the MySQL connection so that it can connect to Google Cloud SQL. I specify the source name to be “Source-MySQL”. Host is filled in with Google Cloud SQL Public IP Address and the port for MySQL is 3306. The database is set up with the database name in Cloud SQL which has been created before, ‘digital_wallet’. To perform CDC, Airbyte will use the username and password it prepared for accessing the database. I’ve already prepared it in the section Preparing Google Cloud SQL. The last step is to set up the replication method. As I expected from the source, it would perform Change Data Capture (CDC). Finally, click the “Save changes and test” button.

I use Kafka as a destination. Click “Destinations” in the navigation and then “New destination”. After that, I specify the destination name with “Destination-Kafka”. Only a few configurations were set up such as Protocol “PLAINTEXT” — I’ll discuss Kafka Security configurations in another article — , Topic Pattern “digital_wallet”, and bootstrap server. The Kafka Server is running in ‘vm-1’, so the bootstrap server is specified by ‘vm-1’ internal IP address. All other configurations are set to default values.

I already have a source and destination right now. The next step is to connect them by navigating to “Connections” and then click on the “Create your first connection” button.

First, choose a source that I created before.

Second, choose the destination.

Before setting up a connection, they ask me to configure a few things. Replication frequency is one of those items, and there are a few options, including manual syncs, hourly updates, or cron-based scheduling. In this case, I will choose manual sync for demonstration purposes.

The image below shows that the connection is already performing CDC. Click on the “Sync now” button to run it.

To check if data has been sent to the Kafka topic, I will run the following command.

./bin/kafka-console-consumer.sh --topic digital_wallet --offset earliest --partition 0 --bootstrap-server localhost:9092

The result is shown below.

Change Data Capture in Airbyte to capture the INSERT, UPDATE, and DELETE SQL commands. The image above shows the INSERT command in the sample_digital_wallet.sql file.

Performing Example Transactions

As I mentioned above, I already have 3 Python programs that run some example transactions. For that, I use the following command in my VM terminal.

python3 transfer.py 1 2 25

python3 deposit.py 3 50

python3 withdraw.py 4 20

Below is a comparison of the before (right) and after (left) running the example transactions.

To find out how the data is stored in the Kafka topic, let’s perform CDC in Airbyte. The following three images illustrate the results of the example transactions. By executing ‘python3 transfer.py 12 25’, user 1 transfers money to user 2 in the amount of 25. It will perform three commands, insert a row in the transactions table, and update user 1 and 2’s balances in the users table. The Kafka topic shows us that they capture 3 events too. Well, guess what happened in deposit.py and withdraw.py? Please leave your answer in the comment section.

Kafka Topic to BigQuery

Sure… I know you’re confused when you see the data in the terminal. During this section, I will show you how to send data from a Kafka topic to BigQuery as a Data Warehouse or OLAP database system. To do that, I create a Kafka consumer by using the Kafka-python library in Python. The code is shown below.

kafka_to_bigquery.py

from google.cloud import bigquery
from kafka import KafkaConsumer, TopicPartition
from json import loads, load, dump
from time import sleep
import os

KAFKA_HOST = 'localhost'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "PUT_BIGQUERY_SERVICE_ACCOUNT_HERE"
TOPIC = 'digital_wallet'
TOPIC_GROUP = 'digital-wallet-group-0'
OFFSET_FILE = '/PATH_TO_KAFKA_OFFSET_FILE/offsets.json'
PROJECT_ID = "PUT_GCP_PROJECT_ID_HERE"
DATASET_ID = "cdc_airbyte"
TABLE_ID_1 = "digitalWallet_users"
TABLE_ID_2 = "digitalWallet_transactions"

def save_offset(offset):
offsets = {}
offsets.setdefault(TOPIC_GROUP, {}).setdefault(TOPIC, {})[0] = offset
with open(OFFSET_FILE, 'w') as f:
dump(offsets, f)

def read_offset():
if os.path.exists(OFFSET_FILE):
with open(OFFSET_FILE, 'r') as f:
offsets = load(f)
offset = offsets.get(TOPIC_GROUP, {}).get(TOPIC, {}).get('0', 0)
return offset
else:
return -1

consumer = KafkaConsumer(
bootstrap_servers=[KAFKA_HOST + ':9092'],
auto_offset_reset='none',
group_id=TOPIC_GROUP,
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
tp = TopicPartition(TOPIC, 0)
consumer.assign([tp])
consumer.seek(tp, read_offset()+1)

client = bigquery.Client()

for event in consumer:
if event.value['_airbyte_stream'] == 'digitalWallet_users':
data = event.value['_airbyte_data']
temp1 = data['_ab_cdc_updated_at'][:-1]
data['_ab_cdc_updated_at'] = temp1
if data['_ab_cdc_deleted_at'] != None:
temp2 = data['_ab_cdc_deleted_at'][:-1]
data['_ab_cdc_deleted_at'] = temp2
client.insert_rows_json(
f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID_1}", [data]
)
print(data)
sleep(2)
save_offset(event.offset)
elif event.value['_airbyte_stream'] == 'digitalWallet_transactions':
data = event.value['_airbyte_data']
temp1 = data['created_at'][:-7]
temp2 = data['_ab_cdc_updated_at'][:-1]
data['created_at'] = temp1
data['_ab_cdc_updated_at'] = temp2
if data['_ab_cdc_deleted_at'] != None:
temp3 = data['_ab_cdc_deleted_at'][:-1]
data['_ab_cdc_deleted_at'] = temp3
client.insert_rows_json(
f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID_2}", [data]
)
print(data)
sleep(2)
save_offset(event.offset)

Oh, don’t forget to create a dataset and table in BigQuery. I created a database named “cdc_airbyte” and two tables within it. They are “digitalWallet_users” and “digitalWallet_transactions”. After that, I executed the following command in the VM terminal.

python3 kafka_to_bigquery.py

Data has been delivered successfully to BigQuery.

As I said earlier, BigQuery is a suitable option for an OLAP database system. We can easily aggregate and analyze our data. In this case, I would like to join the two tables so that they can display account mutations for each user. It can be done with the following SQL query.

WITH new_transaction_1 AS(
WITH new_transaction AS (
SELECT
t.user_id as user_id, t.amount * -1 as amount, t.transaction_type as transaction_type,
t._ab_cdc_updated_at as updated_at
FROM `cdc_airbyte.digitalWallet_transactions` t
WHERE t.transaction_type = 'transfer'
UNION ALL
SELECT
t.user_destination as user_id, t.amount as amount, t.transaction_type as transaction_type,
t._ab_cdc_updated_at as updated_at
FROM `cdc_airbyte.digitalWallet_transactions` t
WHERE t.transaction_type = 'transfer'
UNION ALL
SELECT
t.user_id as user_id, t.amount as amount, t.transaction_type as transaction_type,
t._ab_cdc_updated_at as updated_at
FROM `cdc_airbyte.digitalWallet_transactions` t
WHERE t.transaction_type = 'deposit'
UNION ALL
SELECT
t.user_id as user_id, t.amount*-1 as amount, t.transaction_type as transaction_type,
t._ab_cdc_updated_at as updated_at
FROM `cdc_airbyte.digitalWallet_transactions` t
WHERE t.transaction_type = 'withdraw'
)

SELECT
n.user_id,
ROW_NUMBER() OVER (PARTITION BY n.user_id ORDER BY n.updated_at ASC) as order_transaction,
n.amount as amount_transaction,
n.transaction_type,
n.updated_at
FROM new_transaction n
ORDER BY n.user_id ASC
),

new_user AS(
SELECT
u.id as user_id,
ROW_NUMBER() OVER (PARTITION BY u.id ORDER BY u._ab_cdc_updated_at ASC)-1 as order_transaction,
u.email,
u.balanced as current_balanced,
u.region,
u._ab_cdc_updated_at as updated_at
FROM `cdc_airbyte.digitalWallet_users` u
ORDER BY u.id ASC
),

new_user_transaction AS(
SELECT u.user_id,
u.email,
u.region,
t.amount_transaction,
t.transaction_type,
u.current_balanced,
t.updated_at
FROM new_user u
LEFT JOIN new_transaction_1 t
ON u.user_id = t.user_id AND u.order_transaction = t.order_transaction
ORDER BY u.user_id ASC
)

SELECT
user_id,
email,
region,
ARRAY_AGG(STRUCT(amount_transaction, transaction_type, current_balanced, updated_at)) AS transactions_history
FROM new_user_transaction
GROUP BY user_id, email, region

Well, thanks for reading this article. By writing this article I have kept my word to explain how to build streaming processing from a database source. Change Data Capture is one of the techniques a Data Engineer should know. It is very useful to ingest data from a database in a streaming approach. I guess it will be fun if we discuss batch processing in my next article. Thank you :)

--

--