Как справиться с Debezium + MySql + Spring Could Streams, Part 1

Kate

Administrator
Команда форума

Предистория​

На предыдущем месте работы (в одной Британской компанни) существовала распиленная MySql БД под три микросевиса, назовем их условно Cusomer, Person, Account (у каждого микросервиса своя DB соответственно). К каждой БД был подвязан свой Address (адрес у Кастормера, у Персона, и под аккаунт тоже мог быть подвязан адрес). Раньше когда это была одна большая база данных, поиск по адресу не составлял труда, но когда базу распилили, это сделать стало сложнее. Было принято решение написать микросервис, который бы подключался по CDC к этим трем БД (технологии: MySql, Debezium, Spring Cloud Streams) и складывал бы только нужные данные для поиска (по адресу, по имени, и т.д.) в одну базу данных (MongoDB). В ходе решения этой задачи, я получил интересны опыт, которым и хотел бы поделиться.

Debezium + MySql​

Debezium - написанная на Java платформа, которая подключается к бинарному логу MySql (куда он записывает изменения схемы, данных и т.д.), считывает непрочитанные /новые логи и отправляет их в Kafka topic. Дальше мы можем подключиться к этому Kafka топику и обрабатывать сообщения.

Чтобы разобраться с Devezium мне очень помогло видео Виктора Гамова и официальная документация.

Для того, что бы запустить все локально, начнем с docker-compose.yml файла:

version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:5.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:pLAINTEXT,PLAINTEXT_HOST:pLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_ENABLE: 'false'
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1

connect-debezium:
image: debezium/connect:1.2
container_name: connect-debezium
depends_on:
- broker
- person-database
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: broker:9092
GROUP_ID: connect-debezium
CONFIG_STORAGE_TOPIC: docker-connect-debezium-configs
OFFSET_STORAGE_TOPIC: docker-connect-debezium-offsets
STATUS_STORAGE_TOPIC: docker-connect-debezium-status
KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
volumes:
- ${PWD}/scripts:/scripts
person-database:
container_name: person-database
image: mysql
hostname: person-database
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: dev
MYSQL_PASSWORD: password
MYSQL_DATABASE: person
# volumes:
# - ./mysqlconf/mysqlconf.cnf:/etc/mysql/conf.d/mysql.cnf
# - ./dump:/docker-entrypoint-initdb.d
mongodb:
container_name: search-database
image: mongo
hostname: search-database
ports:
- 27017-27019:27017-27019
environment:
MONGO_INITDB_DATABASE: search
MONGO_INITDB_ROOT_USERNAME: dev
MONGO_INITDB_ROOT_PASSWORD: password
Здесь мы подымаем главных контейнера 4 контейнера:

  • zookeeper — необходим для менеджмента kafka брокера.
  • broker — это наша kafka с конфигами поключения zookeeper, брокера, и т. д.
  • connect-debezium — standalong debezium, готовый к подключению к kafka.
    Стоит отметить что в официальной документации можно встретить этот docker-compose файл с конфигурациями key/value конверторов в Avro формате ( и еще один контейнер schem-registry для поддержки этого формата). Но в моей задаче обычного string/json конвертера было больше чем достаточно.
    KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
    VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
  • MySql контейнер.
  • MongoDB контейнер (для части 2 туториала).
Запускаем докер: docker-compose -up

После того как все контейнеры запустились, нужно включить в Msyql бин-лог и дать права debezium подключаться к нему.

Для этого идем в mysql контейнер
docker exec -it <mysql-container-id> mysql -u root -padmin
и выполняем nano /etc/mysql/my.cnf . Т.к. в первоначально текстового редактора в контейнере нет, его можно установить apt-get update && apt-get install nano.

Добавляем после [mysql] следующее:

server_id=42

log_bin=mysql_bin

binlog_format=row

binlog_row_image=full

expire_logs_days=10

!Важно, после сохранения нужно ре-стартануть контейнер!

Также необходим дать права debezium:

CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT ALL PRIVILEGES ON *.* TO 'debezium'@'%';
ALTER USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz';
Когда наша БД настроена, настало время подружить ее с коннектором. Для этого нужно отправить метод POST с конфигами коннектора в теле запроса.

curl -i -X POST -H "Content-Type:application/json" \
-d '{

"name": "data-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "person-database",
"database.port": "3306",
"database.user": "debezium",
"database.allowPublicKeyRetrieval":"true",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "mysql_server",
"database.include.list": "person",
"database.history.kafka.bootstrap.servers": "broker:9092",
"database.history.kafka.topic": "schema-changes.person",
"include.schema.changes":"false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.headers": "db",
"transforms.unwrap.add.fields": "op,table",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
Для этого мне удобнее всего использовать Postman.

Здесь используется Kafka-Сonnect, возможно трансформировать сообщение на этапе генерации. Для этого используется «transform.*». Об этом можно детально почитать здесь и здесь. Например важно удалять tomb-stone сообщения.

Т.к. мне не нужно было читать логи об изменениях схемы, я их выключил :
"include.schema.changes":"false"
(по дефолту «true»)

И так, оправляем Post на http://localhost:8083/connectors.

fa22b963bf82db92b653e68bab30223d.png

Дополнительно проверяем наш коннектор:
curl -i -X GET http://localhost:8083/connectors/data-connector
curl -i -X GET http://localhost:8083/connectors/data-connector/status
Ошибок нет, видим статус RUNNING, значит все норм.

Если все-таки есть ошибки, из практики, скорее всего неправильно указан пароль\права debezium или просто не рестартанули контейнер mysql. Вообще прилетает сразу весь список ошибок, можно разобраться.

Итак наш CDC готова, давайте посмотрим топики в kafka:
docker exec -i broker /usr/bin/kafka-topics --list --bootstrap-server broker:29092

f91d1350c18680780bc9f5180e67e4f0.png

Коннектор создал служебные топики в kafka, но это еще не топики для сообщений об изменениях.

Зайдем в наш mysql контейнер и попробуем создать простую таблицу:

CREATE TABLE IF NOT EXISTS tasks (
task_id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255) NOT NULL,
start_date DATE,
status TINYINT NOT NULL,
priority TINY INT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=INNODB;
Опять смотрим топики в kafka:
docker exec -i broker /usr/bin/kafka-topics --list --bootstrap-server broker:29092

Ничего не поменялось
Ничего не поменялось
Дело в том, что топики создаются в kafka с «lazy approach» и т.к. мы отключили логи изменения схемы БД, нам ничего не прилетело, топик не создался.

Теперь давайте добавим данные в таблицу:

INSERT INTO tasks (title, start_date, status, priority, description)
VALUES ("title1", "2021-05-28", "1", "1", "description1");
Опять смотрим топики в kafka:

64e5d7e27f583f548cb61da1ad90ad21.png

О чудо, у нас появился новый топик mysql_server.person.tasks!
Тут важно понимать, что коннектор создает в Kafka отдельный топик под каждую таблицу - one topic per table.

Подключаем консольного консьюмера
docker exec -i broker /usr/bin/kafka-console-consumer --bootstrap-server broker:29092 --topic mysql_server.person.tasks --from-beginning
и видим наш лог:

50fbd645a3d4b91cfc6c033d28bff710.png


 
Сверху