CDC, veritabanlarında yapılan değişikliklerin (insert, update, delete) kaydedilmesi için kullanılan bir teknolojidir,
Debezium’ da Apache Kafka kullanılan sistemlerde yapılan değişiklikleri yakalamak ve bu değişiklikleri bir diğer sistemlere aktarmak için kullanılan açık kaynak bir CDC teknolojisidir.
Debezium Connector’ lerini kullanarak farklı veritabanlarında yapılan değişiklikleri yakalabilir, bu değişiklikleri Kafka Connect ile Kafka Topic’ e aktarabiliriz.
Debezium’ un popüler veritabanları için kullanabileceğimiz connectorleri mevcuttur.
MySQL, PostgreSQL, MongoDB, SQL Server etc.

Bir önceki makalemizde Strimzi operatörünü kullanarak Kafka Cluster kurulumunu yapmıştık, buraya tıklayarak makaleyi inceleyebilirsiniz, şimdi sistemimize kafka connect ve debezium connector kurulumlarını yapalım, içerisinde veri olan bir tane de mysql veritabanı ayaklandırıp, kafka topic’ e değişiklikleri nasıl aktardığımızı izleyelim.
kafkaconnect’ i aşağıdaki yaml file ile ayağa kaldıralım, docker hub da bulduğum public debezium-connect-mysql imajı ile çalıştırıyorum, dilersek maven reposundan imajı kendimiz de oluşturabiliriz.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
annotations:
strimzi.io/use-connector-resources: “true”
spec:
version: 3.6.0
image: marcportabellaclotet/debezium-connect-mysql
replicas: 1
bootstrapServers: demo-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: debezium-cluster
offset.storage.topic: debezium-cluster-offsets
config.storage.topic: debezium-cluster-configs
status.storage.topic: debezium-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1

Örnek bir mysql deployment’ ı oluşturalım aynı namespace üzerinde;
apiVersion: v1
kind: Service
metadata:
name: mysql
spec:
ports:
– port: 3306
selector:
app: mysql
clusterIP: None
—
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
– image: quay.io/debezium/example-mysql:2.4
name: mysql
env:
– name: MYSQL_ROOT_PASSWORD
value: debezium
– name: MYSQL_USER
value: mysqluser
– name: MYSQL_PASSWORD
value: mysql2024.
ports:
– containerPort: 3306
name: mysql
Şimdi connector’ümüzü oluşturalım, bootstrap server service name’ e dikkat edelim, doğru servis isimelri ile hareket etmezsek verileri yakalayamayız.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: kafka-connect-cluster
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: root
database.password: debezium
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: demo-cluster-kafka-brokers:9092
schema.history.internal.kafka.topic: schema-changes.inventory

Şimdi webui’ a gidelim, veritabanımızda bir değişiklik yapıp izleyelim,
kubectl exec ile pod’ a bağlanıp değişiklik yaptım,

Şimdi webui üzerinden değişikliği yakalamış mıyız kontrol edelim,
Yapmış olduğum değişiklikler topic’ e gelmiş,


Değişiklikleri yakalayıp, kafka’ya aktardık, log formatında yakalanan değişiklikleri yine belirli connectorlerle farklı sistemlere aktarabiliriz,
Faydalı Olması Dileklerimle,