简介
如果后端应用数据存储使用的MySQL,项目中如果有这样的业务场景,我们会怎么做呢?
- 分库分表数据拆分和迁移
- 历史数据同步分析
- 异步处理
- 多个应用之间数据同步和共享
- 建立elasticsearch搜索
对于最简单最直接的做法就是修改原有应用的代码,在数据发生改变的同时通知下游系统,或者数据改变发送MQ,下游系统消费消息。这样的设计虽然看似简单,但是实现真的很麻烦,数据库表多、业务复杂,各种业务代码里面到处是增删改,这样的设计后期难以维护,也难以保证数据一致性和可靠性。
试想有没有可靠的替代方案,无需代码侵入,当数据库发生改变的时候,这些改变都是一个一个的data change事件发布到相应的中间件,下游系统订阅消息,这个设计就不得不提大名鼎鼎的kafka confluent了。
Kafka connect是Confluent公司(当时开发出Apache Kafka的核心团队成员出来创立的新公司)开发的confluent platform的核心功能.大家都知道现在数据的ETL过程经常会选择kafka作为消息中间件应用在离线和实时的使用场景中,而kafka的数据上游和下游一直没有一个无缝衔接的pipeline来实现统一,比如会选择flume或者logstash采集数据到kafka,然后kafka又通过其他方式pull或者push数据到目标存储.而kafka connect旨在围绕kafka构建一个可伸缩的,可靠的数据流通道,通过kafka connect可以快速实现大量数据进出kafka从而和其他源数据源或者目标数据源进行交互构造一个低延迟的数据pipeline。
虽然kafka confluent提供了JDBC Connector使用JDBC的方式去获取数据源,这种方式kafka connector追踪每个表中检索到的组继续记录,可以在下一次迭代或者崩溃的情况下寻找到正确的位置,这里存在几种实现模式,具体可以参考官网说明JDBC Source Connector。但是我这里推荐使用debezium,这种方式基于MySQL binlog的特性,首先你需要了解什么是debezium。
debezium是一个开源的分布式CDC(变更数据捕获)系统,支持对接各种数据源,将上游已持久化的数据变更捕获后写入消息队列,其特性查看官网How it works,类似的CDC系统还有Canal。
debezium安装使用
部署kafka confluent
如何部署kafka confluent这里不再描述,可以参考Confluent安装部署这篇文章。
安装debezium插件
下载
官网地址debezium。
安装插件Debezium
把解压后的debezium复制到conlfuent安装目录share/java文件中,如1
/data/confluent/share/java/debezium-connector-mysql
再次启动confluent即可
debezium使用
使用debezium之前必须先开启mysql得binlog,这里不再叙述。接下来构建一个kafka connect来使用debezium插件,confluent提供了restful api可快速创建kafka connect。
创建kafka connect连接1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "ip/hostname",
"database.port": "port",
"database.user": "user",
"database.password": "******",
"database.server.id": "1",
"database.server.name": "dbname",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"include.schema.changes": "true"
"decimal.handling.mode": "string"
}
}
'
这里的脚本其实是一行,我为了方便查看展开了json。复制可用的脚本:1
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name":"mysql-connector","config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "ip/hostname", "database.port": "port", "database.user": "user", "database.password": "******", "database.server.id": "1", "database.server.name": "dbname", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.inventory" , "include.schema.changes": "true", "decimal.handling.mode": "string" }}'
Debezium mysql 连接器属性
属性 | 默认值 | 属性含义 |
---|---|---|
name | 连接器的名字,不能和其他连接器的名字重复,如果用已经存在的连接器名字去注册会失败。这个属性也是所有Kafka Connect连接器都需要的属性 | |
connector.class | 连接器的java类,对于MySQL连接器来说,总是io.debezium.connector.mysql.MySqlConnector | |
tasks.max | 1 | 连接器创建的最大的任务数,MySQL连接器总是使用单任务,所以用不到这个值,默认的就可以了。 |
database.hostname | MySQL数据库服务器的IP地址或者主机名 | |
database.port | 3306 | MySQL数据库服务器的端口号 |
database.user | 连接数据库的用户名 | |
database.password | 连接数据库的密码 | |
database.server.name | host:port | debezium监控的MySQL服务器/集群的逻辑名。这个逻辑名应该在所有连接器中唯一,因为这个会用在Kafka topic的前缀,默认是’host:port’这样,host就是上面的database.hostname属性值,port就是上面的database.port属性值。但是我们推荐使用明确的、有意义的名字。 |
database.server.id | random | 数据库客户端(debezium连接器)数字id,在数据库集群中应该唯一。其实连接器用这个id,以一个数据库服务器的身份加入数据库集群,这样才能够读取binlog文件。默认情况下,随机数在5400到6400之间,推荐显示设置一个值。 |
database.history.kafka.topic | kafka topic的全名,连接器将把数据库的schema历史信息存入这个topic中。 | |
database.history.kafka.topic.bootstrap.servers | 用于连接Kafka集群的host/port对。这个连接将用于获取连接器此前存放的数据库schema历史,并且把从源数据库(被监控的数据库)中读取到的DDL语句写入到这个Kafka集群中。这个连接参数应该和Kafka Connect用的集群一致。 | |
database.whitelist | 空字符串 | 用逗号隔开的正则表达式列表,可以匹配多个被监控的数据库名称,不在白名单中的数据库不会被debezium连接器监控。默认情况下,所有的数据库都会被监控。不能和database.blacklist同时使用。 |
database.blacklist | 空字符串 | 用逗号隔开的正则表达式列表,用来匹配不想监控的数据库。任何不在黑名单中的数据库都会被监控。不能和database.whitelist同时使用。 |
table.whitelist | 空字符串 | 逗号分割的正则表达式列表,用于匹配要监控的表的全名(数据库名.表名)。不同和table.blacklist同时使用经过实践,发现表白名单和数据库白名单也不能同时使用。 |
table.blacklist | 空字符串 | 逗号分割的正则表达式列表,用于匹配不要监控的表的全名(数据库名.表名)。不能和table.whitelist同时使用。经过实践,发现表黑名单和数据库黑名单也不能同时使用。 |
column.blacklist | 空字符串 | 逗号分割的正则表达式列表,用于匹配不想要监控的列,在事件消息中不会包含的列值。应该是databaseName.tableName.columnName或者databaseName.schemaName.tableName.columnName这样的全限定名。 |
column.truncate.to.length.chars | n/a | 逗号分割的正则表达式列表,用于匹配需要在事件消息中截短的列名。一个配置列表中可以配置多个不同的长度。列名应该是databaseName.tableName.columnName或者databaseName.schemaName.tableName.columnName这样的全限定名。 |
更多连接器属性参考:原文链接
注意:
如果对topics的消费者是java语言的连接方式一定要加上`"decimal.handling.mode": "string"`
示例中的关于地址、用户、密码等设置已经隐藏,需要根据实际情况自行替换
可以通过命令查看已创建成功的connect,如下1
2# curl -H "Accept:application/json" localhost:8083/
{"version":"2.1.1-cp1","commit":"f5b753880d5460f1","kafka_cluster_id":"__SovfFYR5WfKrZ1xuBlaw"}
如果一切正常,可以通过Confluent Control Center看到kafka集群上多了一些和表名相关的topic,topic命名规则为debezium mysql connector配置文件中配置的serverName.databaseName.tableName。
验证
debezium会读取MySQL binlog产生数据改变事件,将事件发送到kafka队列,最简单的验证办法就是监听这些队列(这些队列按照表名区分)具体参考代码请查看github.com/moxingwang/…。
这里我们观察数据库的order_info表,监听*.*.order_info
队列。
首先在数据库中将order_info表字段内容update。
此时,应用消费者会立马收到一条消费消息,具体的信息不再展示。
命令使用
打印出所有的topics1
kafka-topics --list --zookeeper localhost:2181
创建一个名为test的topic1
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
更改tioic分区1
kafka-topics --zookeeper localhost:2181 --topic test --alter --partitions 4
查看指定topic信息1
kafka-topics --zookeeper localhost:2181 --topic test --describe
创建一个消息消费者1
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
创建一个消息生产者1
kafka-console-producer --broker-list localhost:9092 --topic test
应用消费者对topics的消费,对数据格式有一定的需求,也需要查看数据的内容,此时我们通过以下命令:1
kafka-console-consumer --bootstrap-server localhost:9092 --topic topics_name --from-beginning
此命令打印出消息过长,一般不需要使用。