Canal——增量同步MySQL数据到ElasticSearch
1.下载安装ElasticSearch (安装后 浏览器输入http://localhost:9200/ 查看es信息)
2. 下载安装同版本kibana (安装后 使用工具访问地址 http://127.0.0.1:5601/app/kibana#/dev_tools/console?_g=() )
3. kibana 常用语句
GET locations/_search
#创建索引
PUT /{IndexName}?pretty
#索引取别名
PUT /{OldIndexName}/_alias/{NewIndexName}
#查看某个索引映射
GET locations/_mapping
# 查询所有索引信息
GET /_cat/indices?v
#删除locations索引
DELETE /locations?pretty
4. MySql配置
需要先开启MySQL的 binlog 写入功能,配置 binlog-format 为 ROW 模式
找到my.cnf文件, 添加以下配置:
log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
然后重启mysql,用以下命令检查一下binlog是否正确启动:
mysql> show variables like 'log_bin%';
+---------------------------------+----------------------------------+
| Variable_name | Value |
+---------------------------------+----------------------------------+
| log_bin | ON |
| log_bin_basename | /data/mysql/data/mysql-bin |
| log_bin_index | /data/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+---------------------------------+----------------------------------+
5 rows in set (0.00 sec)
mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER solo IDENTIFIED BY 'solo'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
5.下载canal.deployer 与canal.adapter,我这里下载的是1.1.3 (1.1.4和1.1.2 可能由于我的错误操作报错了)
https://github.com/alibaba/canal/releases
注:
1.1.4 不知道为啥要把windows启动脚本的这句给去掉 set logback_configurationFile=%conf_dir%logback.xml
6. canal.deployer 的配置并启动
- 修改conf/example/instance.properties文件,主要注意以下几处:
canal.instance.master.address:数据库地址,例如127.0.0.1:3306
canal.instance.dbUsername:数据库用户
canal.instance.dbPassword:数据库密码
#这个是比较重要的参数,匹配库表白名单,比如我只要test库的user表的增量数据,则这样写 test.user,不改则全部匹配
canal.instance.filter.regex=.user
- 启动canal,并查看日志(有warn信息不要紧)
报错信息:com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
解决方法:
找到conf/canal.properties 文件里面的
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml 注释掉
- linux启动出现了一个既不报错也没有日志打印的烦人问题
# 要将这个注释去掉!!
#canal.instance.parser.parallelThreadSize = 16
7. canal.adapter 的配置并启动
- 修改conf/application.yml文件,主要注意如下内容,由于是yml文件,注意我这里说明的属性名称:
server.port:canal-adapter端口号 #默认即可不用改 canal.conf.canalServerHost:canal-server地址和ip #默认不用改 canal.conf.srcDataSources.defaultDS.url:数据库地址 canal.conf.srcDataSources.defaultDS.username:数据库用户名 canal.conf.srcDataSources.defaultDS.password:数据库密码 canal.conf.canalAdapters.groups.outerAdapters.hosts:es主机地址,tcp端口 #注释解开即可默认不用改
- 另外需要配置conf/es/*.yml文件,adapter将会自动加载conf / es下的所有.yml结尾的配置文件。
# 例如
dataSourceKey: defaultDS destination: example groupId: esMapping: _index: location #索引名称(自定义需要再es中创建:我理解为 存储数据库名)
_type: tbcid #索引类型名称 (自定义 我理解为表名)
_id: _id upsert: true sql: "select a.id as _id,a.name,a.address from test a" commitBatch: 3000
- 对应 es 创建索引
#创建索引 PUT /{IndexName}?pretty 例如: PUT /location { "query": { "match_all": {} }, "mappings":{ "tbcid":{ "properties":{ "name":{ "type":"text" }, "address":{ "type":"text" } } } } }
- 启动canal,并查看日志
- data类型报错 nested: IllegalArgumentException[Invalid format: "2020-08-18T10:23:51+08:00" is malformed at "T10:23:51+08:00"];
// 映射加上 yyyy-MM-dd'T'HH:mm:ss+08:00 用来转换格式 @Field( type = FieldType.Date, format = DateFormat.custom,pattern = "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd'T'HH:mm:ss+08:00||yyyy-MM-dd||epoch_millis" ) @JsonFormat(shape = JsonFormat.Shape.STRING, pattern ="yyyy-MM-dd HH:mm:ss",timezone="GMT+8") private Date crtTm;
8. 测试
可以去数据库新增数据并去kibana中查看了
9.总结与踩坑
- 不用下载自己编译,只要下载上图的安装包即可
- canal.adapter 配置文件中数据库名及表名 的大写可能会导致数据不能同步 (!!!坑,我本地数据库名是大写,然后配置上去一直没用,直到改成小写)
- 要有 Affected indexes: location 打印才是同步成功的,数据库及表名的大小写可以通过这行打印看出来,上面是有大写的,下面是小写成功的
-
全量更新不能实现,但是增删改都是可以的;
一定要提前创建好es索引;
es配置的是tcp端口,比如默认的9300;
目前es貌似支持6.x版本,不支持7.x版本;
附录
更详细学习博客