Change Data Capture Nedir? Debezium – Kafka Connect

CDC, veritabanlarında yapılan değişikliklerin (insert, update, delete) kaydedilmesi için kullanılan bir teknolojidir,

Debezium’ da Apache Kafka kullanılan sistemlerde yapılan değişiklikleri yakalamak ve bu değişiklikleri bir diğer sistemlere aktarmak için kullanılan açık kaynak bir CDC teknolojisidir.

Debezium Connector’ lerini kullanarak farklı veritabanlarında yapılan değişiklikleri yakalabilir, bu değişiklikleri Kafka Connect ile Kafka Topic’ e aktarabiliriz.

Debezium’ un popüler veritabanları için kullanabileceğimiz connectorleri mevcuttur.

MySQL, PostgreSQL, MongoDB, SQL Server etc.

Image 31 1024x284

Bir önceki makalemizde Strimzi operatörünü kullanarak Kafka Cluster kurulumunu yapmıştık, buraya tıklayarak makaleyi inceleyebilirsiniz, şimdi sistemimize kafka connect ve debezium connector kurulumlarını yapalım, içerisinde veri olan bir tane de mysql veritabanı ayaklandırıp, kafka topic’ e değişiklikleri nasıl aktardığımızı izleyelim.

kafkaconnect’ i aşağıdaki yaml file ile ayağa kaldıralım, docker hub da bulduğum public debezium-connect-mysql imajı ile çalıştırıyorum, dilersek maven reposundan imajı kendimiz de oluşturabiliriz.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
annotations:
strimzi.io/use-connector-resources: “true”
spec:
version: 3.6.0
image: marcportabellaclotet/debezium-connect-mysql
replicas: 1
bootstrapServers: demo-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: debezium-cluster
offset.storage.topic: debezium-cluster-offsets
config.storage.topic: debezium-cluster-configs
status.storage.topic: debezium-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1

Image 32 1024x75

Örnek bir mysql deployment’ ı oluşturalım aynı namespace üzerinde;

apiVersion: v1

kind: Service

metadata:

  name: mysql

spec:

  ports:

  – port: 3306

  selector:

    app: mysql

  clusterIP: None

apiVersion: apps/v1

kind: Deployment

metadata:

  name: mysql

spec:

  selector:

    matchLabels:

      app: mysql

  strategy:

    type: Recreate

  template:

    metadata:

      labels:

        app: mysql

    spec:

      containers:

      – image: quay.io/debezium/example-mysql:2.4

        name: mysql

        env:

        – name: MYSQL_ROOT_PASSWORD

          value: debezium

        – name: MYSQL_USER

          value: mysqluser

        – name: MYSQL_PASSWORD

          value: mysql2024.

        ports:

        – containerPort: 3306

          name: mysql

Şimdi connector’ümüzü oluşturalım, bootstrap server service name’ e dikkat edelim, doğru servis isimelri ile hareket etmezsek verileri yakalayamayız.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: kafka-connect-cluster

labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: root
database.password: debezium
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: demo-cluster-kafka-brokers:9092
schema.history.internal.kafka.topic: schema-changes.inventory

Image 33 1024x39

Şimdi webui’ a gidelim, veritabanımızda bir değişiklik yapıp izleyelim,

kubectl exec ile pod’ a bağlanıp değişiklik yaptım,

Image 34

Şimdi webui üzerinden değişikliği yakalamış mıyız kontrol edelim,

Yapmış olduğum değişiklikler topic’ e gelmiş,

Image 35 1024x403
Image 36 1024x405

Değişiklikleri yakalayıp, kafka’ya aktardık, log formatında yakalanan değişiklikleri yine belirli connectorlerle farklı sistemlere aktarabiliriz,

Faydalı Olması Dileklerimle,

Yorum bırakın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir