| tags: [ MySQL Kafka Debezium ] categories: [ Development ]
使用 Debezium 复制 MySQL binlog 到 Kafka
Debezium 是一个支持 Kafka Connect 框架的数据库变更捕获(Change Data Capture)服务,它可以消费 MySQL、PostgreSQL、SQL Server、MongoDB 的 binlog 写入到 Apache Kafka 里,下文是针对 MySQL 的简单配置步骤。
使用 docker 创建数据库:
docker network create mysql docker run -dt -p 13306:3306 \ -e MYSQL_ALLOW_EMPTY_PASSWORD=yes \ --net mysql --name mysql_1 mysql:5.7 \ --server_id=1 --log_bin=mysql-bin \ --gtid_mode=ON --enforce_gtid_consistency=ON # 这个例子里 Debezium 用不到 mysql_2 这个从库 docker run -dt -p 23306:3306 \ -e MYSQL_ALLOW_EMPTY_PASSWORD=yes \ --net mysql --name mysql_2 mysql:5.7 \ --server_id=2 --log_bin=mysql-bin \ --gtid_mode=ON --enforce_gtid_consistency=ON docker exec mysql-2 mysql -Be "STOP SLAVE; CHANGE MASTER TO MASTER_HOST='mysql_1', MASTER_USER='root', MASTER_AUTO_POSITION=1; START SLAVE" docker exec mysql-2 mysql -Be "SHOW SLAVE STATUS\G"
在 mysql_1 里新建帐号
CREATE USER 'debezium-user' IDENTIFIED BY '123456'; GRANT SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium-user'; GRANT SELECT ON test.* TO 'debezium-user'; FLUSH PRIVILEGES;
这里没有授予
RELOAD ON *.*
以及LOCK TABLES ON test.*
是为了避免 Debezium 在获取 table schema 时锁表。特别注意的是一定需要授予SELECT
权限,Debezium 依赖这个权限执行SHOW FULL TABLES IN some_db WHERE Table_Type = 'BASE TABLE'
以及SHOW CREATE TABLE some_db.some_table
获取每个表的列名,没有这个权限,查询information_schema.tables
和information_schema.columns
也查不到对应表的信息,这是 MySQL 的奇葩权限机制导致。下载 Confluent Platform,解开到某个目录下,记作
$CONFLUENT_ROOT
;从 https://www.confluent.io/hub/debezium/debezium-connector-mysql/ 手动下载 debezium 插件的 zip 压缩包,解压缩到
$CONFLUENT_ROOT/share/java
;执行
export CONFLUENT_CURRENT=$CONFLUENT/data; $CONFLUENT_ROOT/bin/confluent local start
启动 Confluent Platform,启动完后grep plugin.path data/confluent.*/connect/connect.properties
可以确认 Kafka Connect 插件应该放到什么位置。编辑
test-connector.json
文件如下:{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "127.0.0.1", "database.port": "13306", "database.user": "debezium-user", "database.password": "123456", "database.server.id": "1", "database.server.name": "mysql_1", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "debezium-mysql_1", "include.schema.changes": "true", "snapshot.mode": "schema_only", "snapshot.locking.mode": "none", "tombstones.on.delete": "false" }
database.history.kafka.topic 和 include.schema.changes
Debezium 会将 DML binlog 以 AVRO 格式(也支持 JSON 格式)写入 Kafka topic
SERVER.DB.TABLE
,一份 JSON 格式的 DDL 写入debezium-mysql_1
,一份 AVRO 格式的 DDL 写入SERVER
(include.schema.changes=true)。snapshot.mode
Debezium 支持五种模式:
initial
:默认模式,在没有找到 offset 时(记录在 Kafka topic 的connect-offsets
中,Kafka connect 框架维护),做一次 snapshot——遍历有 SELECT 权限的表,收集列名,并且将每个表的所有行 select 出来写入 Kafka;when_needed
: 跟initial
类似,只是增加了一种情况,当记录的 offset 对应的 binlog 位置已经在 MySQL 服务端被 purge 了时,就重新做一个 snapshot。never
: 不做 snapshot,也就是不拿所有表的列名,也不导出表数据到 Kafka,这个模式下,要求从最开头消费 binlog,以获取完整的 DDL 信息,MySQL 服务端的 binlog 不能被 purge 过,否则由于 DML binlog 里只有 database name、table name、column type 却没有 column name,Debezium 会报错Encountered change event for table some_db.some_table whose schema isn't known to this connector
;schema_only
: 这种情况下会拿所有表的列名信息,但不会导出表数据到 Kafka,而且只从 Debezium 启动那刻起的 binlog 末尾开始消费,所以很适合不关心历史数据,只关心最近变更的场合。schema_only_recovery
: 在 Debezium 的 schema_only 模式出错时,用这个模式恢复,一般不会用到。
snapshot.locking.mode
设置为 “none” 是为了避免获取表的元信息时锁表(要么是 RELOAD 权限用 flush tables with read lock,要么是 LOCK TABLES 权限锁单个表),此时要求 Debezium 启动或者重启时没有 DDL 语句执行,否则 Debezium 抓取到的元信息跟并发执行的 DML 之间不一致。
tombstones.on.delete
设置成 “false” 是为了避免因 delete 消息额外写入一条不符合 AVRO schema 的 tombstone 消息。
执行如下命令创建 Kafka connector 并启动:
curl -s http://localhost:8083/connectors/test-connector/config -XPUT --data-binary @test-connector.json -H 'Content-Type: application/json' curl -s http://localhost:8083/connectors/test-connector/status | json_pp
之后就可以在 Kafka 的
SERVER.DB.TABLE
,SERVER
,debezium-SERVER
三类 topic 里看到 binlog 消息了。
在 Debezium 文档里还有更多选项,可以做到 database、table、column 级别的黑白名单,可以对 column 脱敏等等。