Skip to content

Debezium MySQL Kafka Connector Deployment

Architecture Overview

Make sure you have deployed a Kafka cluster and a MySQL database first.

After the Kafka cluster and MySQL database are up and running, you can deploy the Debezium MySQL Kafka Connector by running the following commands:

cd ~/Projects/retail-lakehouse/kafka-debezium-mysql-connector
bash install.sh
Result
#!/bin/bash

set -e

# Deploy Debezium Connect Cluster
kubectl apply -f debezium-secret.yaml
kubectl apply -f debezium-role.yaml
kubectl apply -f debezium-role-binding.yaml
kubectl apply -f debezium-connect-cluster.yaml -n kafka-cdc
sleep 5
kubectl logs pod/debezium-connect-cluster-connect-build -n kafka-cdc -f
sleep 5
kubectl wait --for=condition=Ready pod -l app.kubernetes.io/name=kafka-connect -n kafka-cdc --timeout=1200s

kubectl apply -f debezium-connector.yaml -n kafka-cdc
kubectl get kafkaconnector -n kafka-cdc

This script will deploy the necessary secrets and roles in order to create a Debezium Connect Cluster and a Debezium Source Connector on that cluster.

If you don't like my script and want to do it step by step manually, please continue reading. This article will walk you through how to deploy a Debezium MySQL Kafka Connector on Kubernetes step by step, explaining each part along the way.

Create Secret and Role for Accessing the Database

kubectl apply -f debezium-secret.yaml
kubectl apply -f debezium-role.yaml
kubectl apply -f debezium-role-binding.yaml
Result
secret/debezium-secret created
role.rbac.authorization.k8s.io/debezium-role created
rolebinding.rbac.authorization.k8s.io/debezium-role-binding created

YAML Files

debezium-secret.yaml
1
2
3
4
5
6
7
8
9
apiVersion: v1
kind: Secret
metadata:
  name: debezium-secret
  namespace: kafka-cdc
type: Opaque
data:
  username: ZGViZXppdW0=
  password: ZGJ6
debezium-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: debezium-role
  namespace: kafka-cdc
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["debezium-secret"]
  verbs: ["get"]
debezium-role-binding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: debezium-role-binding
  namespace: kafka-cdc
subjects:
- kind: ServiceAccount
  name: debezium-connect-cluster-connect
  namespace: kafka-cdc
roleRef:
  kind: Role
  name: debezium-role
  apiGroup: rbac.authorization.k8s.io

Create a Debezium Kafka Connect Cluster

To deploy a Debezium MySQL connector, you need to deploy a Kafka Connect cluster with the required connector plug-in(s), before instantiating the actual connector itself.

As the first step, a container image for Kafka Connect with the plug-in has to be created.

Strimzi also can be used for building and pushing the required container image for us. In fact, both tasks can be merged together and instructions for building the container image can be provided directly within the KafkaConnect object specification:

Prerequisite

Make sure you have enabled the local registry and set up insecure-registry in your Minikube cluster. If you have followed the instructions in the Prerequisites section, you should have already done this step. If not, you can enable it now by running the following command:

minikube addons enable registry -p retail-lakehouse
Result
💡  registry is an addon maintained by minikube. For any concerns contact minikube on GitHub.
You can view the list of minikube maintainers at: https://github.com/kubernetes/minikube/blob/master/OWNERS
╭──────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                      │
│    Registry addon with docker driver uses port 49609 please use that instead of default port 5000    │
│                                                                                                      │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────╯
📘  For more information see: https://minikube.sigs.k8s.io/docs/drivers/docker
    ▪ Using image docker.io/registry:2.8.3
    ▪ Using image gcr.io/k8s-minikube/kube-registry-proxy:0.0.6
🔎  Verifying registry addon...
🌟  The 'registry' addon is enabled

You can check the IP address of the local registry by running the following command:

kubectl -n kube-system get svc registry -o jsonpath='{.spec.clusterIP}'
Result
10.104.128.211
debezium-connect-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  namespace: kafka-cdc
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 4.0.0
  replicas: 1
  bootstrapServers: kafka-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: debezium-connect-cluster
    exactly.once.support: required
    offset.storage.topic: debezium-connect-cluster-offsets
    config.storage.topic: debezium-connect-cluster-configs
    status.storage.topic: debezium-connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    offset.storage.replication.factor: -1
    config.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      # https://strimzi.io/docs/operators/latest/configuring.html#type-DockerOutput-reference
      type: docker
      image: 10.104.128.211/debezium-mysql-connector:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.1.0.Final/debezium-connector-mysql-3.1.0.Final-plugin.tar.gz
kubectl apply -f debezium-connect-cluster.yaml -n kafka-cdc
Result
kafkaconnect.kafka.strimzi.io/debezium-connect-cluster created

Check the current resources in the kafka-cdc namespace:

kubectl get all -n kafka-cdc
Result
NAME                                                    READY   STATUS    RESTARTS   AGE
pod/kafka-cluster-dual-role-0                           1/1     Running   0          66m
pod/kafka-cluster-dual-role-1                           1/1     Running   0          66m
pod/kafka-cluster-dual-role-2                           1/1     Running   0          66m
pod/kafka-cluster-entity-operator-5b998f6cbf-c8hdf      2/2     Running   0          65m
pod/debezium-connect-cluster-connect-build              1/1     Running   0          49s
pod/mysql-6b84fd947d-9g9lt                              1/1     Running   0          60m

NAME                                       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE
service/kafka-cluster-kafka-bootstrap      ClusterIP   10.105.50.103   <none>        9091/TCP,9092/TCP,9093/TCP                     66m
service/kafka-cluster-kafka-brokers        ClusterIP   None            <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   66m
service/mysql                              ClusterIP   None            <none>        3306/TCP                                       60m

NAME                                               READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/kafka-cluster-entity-operator      1/1     1            1           65m
deployment.apps/mysql                              1/1     1            1           60m

NAME                                                          DESIRED   CURRENT   READY   AGE
replicaset.apps/kafka-cluster-entity-operator-5b998f6cbf      1         1         1       65m
replicaset.apps/mysql-6b84fd947d                              1         1         1       60m

After a while, when the build is complete, you should see the debezium-connect-cluster-connect-build pod disappear and a new pod named debezium-connect-cluster-connect-0 appear:

NAME                                                    READY   STATUS    RESTARTS   AGE
pod/kafka-cluster-dual-role-0                           1/1     Running   0          100m
pod/kafka-cluster-dual-role-1                           1/1     Running   0          100m
pod/kafka-cluster-dual-role-2                           1/1     Running   0          100m
pod/kafka-cluster-entity-operator-5b998f6cbf-c8hdf      2/2     Running   0          99m
pod/debezium-connect-cluster-connect-0                  1/1     Running   0          30m
pod/mysql-6b84fd947d-9g9lt                              1/1     Running   0          94m

NAME                                           TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE
service/kafka-cluster-kafka-bootstrap          ClusterIP   10.105.50.103    <none>        9091/TCP,9092/TCP,9093/TCP                     100m
service/kafka-cluster-kafka-brokers            ClusterIP   None             <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   100m
service/debezium-connect-cluster-connect       ClusterIP   None             <none>        8083/TCP                                       30m
service/debezium-connect-cluster-connect-api   ClusterIP   10.100.229.177   <none>        8083/TCP                                       30m
service/mysql                                  ClusterIP   None             <none>        3306/TCP                                       94m

NAME                                               READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/kafka-cluster-entity-operator      1/1     1            1           99m
deployment.apps/mysql                              1/1     1            1           94m

NAME                                                          DESIRED   CURRENT   READY   AGE
replicaset.apps/kafka-cluster-entity-operator-5b998f6cbf      1         1         1       99m
replicaset.apps/mysql-6b84fd947d                              1         1         1       94m
Warning

If the build fails, you can check the logs of the build pod to see what went wrong. You can also describe the pod to get more information about its status.

kubectl logs pod/debezium-connect-cluster-build -n kafka-cdc
kubectl describe pod/debezium-connect-cluster-build -n kafka-cdc

If the build success, but the pod is not running successfully, you can check the logs of the running pod to see what went wrong. You can also describe the pod to get more information about its status.

kubectl logs pod/debezium-connect-cluster-connect-0 -n kafka-cdc
kubectl describe pod/debezium-connect-cluster-connect-0 -n kafka-cdc

You can also check the Minikube configuration to see if the local registry is set up correctly.

cat ~/.minikube/profiles/retail-lakehouse/config.json | jq .

If there is any issue with pulling the image from the local registry, you can check the local registry to see if the image is there.

kubectl port-forward --namespace kube-system service/registry 5000:80

Create another terminal window, then run the following commands to check the local registry:

curl http://localhost:5000/v2/_catalog
curl http://localhost:5000/v2/debezium-mysql-connector/tags/list

Create a Debezium Source Connector

kubectl apply -f debezium-connector.yaml -n kafka-cdc
Result
kafkaconnector.kafka.strimzi.io/debezium-connector created
debezium-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: ${secrets:kafka-cdc/debezium-secret:username}
    database.password: ${secrets:kafka-cdc/debezium-secret:password}
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: inventory
    schema.history.internal.kafka.bootstrap.servers: kafka-cluster-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.inventory
    exactly.once.source.support: enabled
  • database.include.list: inventory: Specifies the database to capture changes from.
  • topic.prefix: mysql: Sets the prefix for Kafka topics where change events will be published.
  • schema.history.internal.kafka.topic: schema-changes.inventory: Specifies the Kafka topic to store schema history.
  • exactly.once.source.support: enabled: Enables exactly-once delivery semantics for the source connector.
kubectl get kafkaconnector debezium-connector -n kafka-cdc
Result
NAME                 CLUSTER                    CONNECTOR CLASS                              MAX TASKS   READY
debezium-connector   debezium-connect-cluster   io.debezium.connector.mysql.MySqlConnector   1           True

Verify the CDC Pipeline

In order to make sure the CDC pipeline is working correctly, we check 2 things:

Checklist

  • In the Kafka cluster, we should see topics created by the Debezium MySQL Connector, such as mysql.inventory.customers.
  • When we update a record in the customers table in the MySQL database, we should see a corresponding message appear in the mysql.inventory.customers topic.

List Topics

List all topics in the Kafka cluster:

kubectl run kafka-topics-cli \
  -n kafka-cdc \
  -it --rm \
  --image=quay.io/strimzi/kafka:0.46.1-kafka-4.0.0 \
  --restart=Never -- \
  bin/kafka-topics.sh \
    --bootstrap-server kafka-cluster-kafka-bootstrap:9092 \
    --list
Result
__consumer_offsets
debezium-cluster-configs
debezium-cluster-offsets
debezium-cluster-status
mysql
mysql.inventory.addresses
mysql.inventory.customers
mysql.inventory.geom
mysql.inventory.orders
mysql.inventory.products
mysql.inventory.products_on_hand
schema-changes.inventory

Watch Changes

Watch the mysql.inventory.customers topic for changes:

kubectl run kafka-cli \
  -n kafka-cdc \
  -it --rm \
  --image=quay.io/strimzi/kafka:0.46.1-kafka-4.0.0 \
  --restart=Never -- \
  bin/kafka-console-consumer.sh \
    --bootstrap-server kafka-cluster-kafka-bootstrap:9092 \
    --topic mysql.inventory.customers \
    --partition 0
    --offset -10
    --max-messages 10

Create another terminal window, then enter the MySQL pod and update a record in the customers table. Specifically, we will change the first_name of the customer with id=1001 from Sally to Sally Marie:

kubectl exec -n kafka-cdc -it mysql-6b84fd947d-9g9lt -- mysql -uroot -pdebezium
sql> use inventory;
sql> update customers set first_name="Sally Marie" where id=1001;

Go back to the first terminal window where you are watching the mysql.inventory.customers topic. You should see a new message appear that reflects the change you just made in the MySQL database.

Result
{
"schema": {
    "type": "struct",
    "fields": [
    {
        "type": "struct",
        "fields": [
        {
            "type": "int32",
            "optional": false,
            "field": "id"
        },
        {
            "type": "string",
            "optional": false,
            "field": "first_name"
        },
        {
            "type": "string",
            "optional": false,
            "field": "last_name"
        },
        {
            "type": "string",
            "optional": false,
            "field": "email"
        }
        ],
        "optional": true,
        "name": "mysql.inventory.customers.Value",
        "field": "before"
    },
    {
        "type": "struct",
        "fields": [
        {
            "type": "int32",
            "optional": false,
            "field": "id"
        },
        {
            "type": "string",
            "optional": false,
            "field": "first_name"
        },
        {
            "type": "string",
            "optional": false,
            "field": "last_name"
        },
        {
            "type": "string",
            "optional": false,
            "field": "email"
        }
        ],
        "optional": true,
        "name": "mysql.inventory.customers.Value",
        "field": "after"
    },
    {
        "type": "struct",
        "fields": [
        {
            "type": "string",
            "optional": false,
            "field": "version"
        },
        {
            "type": "string",
            "optional": false,
            "field": "connector"
        },
        {
            "type": "string",
            "optional": false,
            "field": "name"
        },
        {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
        },
        {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
            "allowed": "true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
        },
        {
            "type": "string",
            "optional": false,
            "field": "db"
        },
        {
            "type": "string",
            "optional": true,
            "field": "sequence"
        },
        {
            "type": "int64",
            "optional": true,
            "field": "ts_us"
        },
        {
            "type": "int64",
            "optional": true,
            "field": "ts_ns"
        },
        {
            "type": "string",
            "optional": true,
            "field": "table"
        },
        {
            "type": "int64",
            "optional": false,
            "field": "server_id"
        },
        {
            "type": "string",
            "optional": true,
            "field": "gtid"
        },
        {
            "type": "string",
            "optional": false,
            "field": "file"
        },
        {
            "type": "int64",
            "optional": false,
            "field": "pos"
        },
        {
            "type": "int32",
            "optional": false,
            "field": "row"
        },
        {
            "type": "int64",
            "optional": true,
            "field": "thread"
        },
        {
            "type": "string",
            "optional": true,
            "field": "query"
        }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "version": 1,
        "field": "source"
    },
    {
        "type": "struct",
        "fields": [
        {
            "type": "string",
            "optional": false,
            "field": "id"
        },
        {
            "type": "int64",
            "optional": false,
            "field": "total_order"
        },
        {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
        }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
    },
    {
        "type": "string",
        "optional": false,
        "field": "op"
    },
    {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
    },
    {
        "type": "int64",
        "optional": true,
        "field": "ts_us"
    },
    {
        "type": "int64",
        "optional": true,
        "field": "ts_ns"
    }
    ],
    "optional": false,
    "name": "mysql.inventory.customers.Envelope",
    "version": 2
},
"payload": {
    "before": {
    "id": 1001,
    "first_name": "Sally",
    "last_name": "Thomas",
    "email": "sally.thomas@acme.com"
    },
    "after": {
    "id": 1001,
    "first_name": "Sally Marie",
    "last_name": "Thomas",
    "email": "sally.thomas@acme.com"
    },
    "source": {
    "version": "3.1.0.Final",
    "connector": "mysql",
    "name": "mysql",
    "ts_ms": 1751201044000,
    "snapshot": "false",
    "db": "inventory",
    "sequence": null,
    "ts_us": 1751201044000000,
    "ts_ns": 1751201044000000000,
    "table": "customers",
    "server_id": 1,
    "gtid": null,
    "file": "binlog.000002",
    "pos": 401,
    "row": 0,
    "thread": 14,
    "query": null
    },
    "transaction": null,
    "op": "u",
    "ts_ms": 1751201044907,
    "ts_us": 1751201044907793,
    "ts_ns": 1751201044907793970
}
}

References