用途
在大型实战项目开发过程中,当数据量达到比较大的规模时,不可避免的要考虑使用ElasticSearch(es)等搜索引擎来解决大量数据的查询性能压力,因此,做好mysql的数据同步变得至关重要。我所了解,并且使用的是通过Logstash和Canal中间件,来实现将数据写入到ES等中。
一、实现同步原理
1.1 Logstash
Logstash提供了一个JDBC插件,它可以定期查询数据库并捕获变化。通过配置Logstash指定连接到mysql的哪个表和es的哪个索引库,并指定对应的查询语句。当MySQL中的数据发生变化时,Logstash的JDBC插件会定时的检测到这些变化,并且迅速捕获这些新数据或者以更新的数据(根据文档id),并将其同步到es中。
注意
- 因为涉及到数据的更新,ES和Mysql表的id字段对应关系
- 因为是根据时间实现增量同步,所以mysql表中必须有一个包含更新或插入时间的字段
1.2 Canal
Canal中间件则是通过模拟成mysql的从库,实时接收mysql的增量数据,然后通过RESTful API将数据写入到ES中。
二、使用步骤(以es为7.14.0为例)
2.1 Logstash
- 下载与es对应版本的logstash-7.14.0和使用的mysql jar包
- 在logstash-7.14.0安装目录下创建文件夹mysqltb,将jar包放在该目录下,新建mysql.conf文件,添加内容如下:
- 命令行执行 logstash ‐f ../mysqletc/mysql.conf
input {
jdbc {
# 连接的数据库以及表
jdbc_connection_string => ""
# 数据库的账号和密码
jdbc_user => ""
jdbc_password => ""
# mysql jar包的路径
jdbc_driver_library => ""
# the name of the driver class for mysql
jdbc_driver_class => "***.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
#statement_filepath => ""
statement => ""
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 数据标签,用于标记不同的索引数据
type => "user"
# 加上jdbc时区, 要不然logstash的时间会不准确
jdbc_default_timezone => "Asia/Shanghai"
# 设置列名区分大小写, 默认全小写
lowercase_column_names => "false"
}
}
output {
if[type] == "user"{
elasticsearch {
#ESIP地址与端口
hosts => ["" ]
#ES索引名称(自己定义的)
index => ""
#!!!自增ID编号,与查询语句中特有字段名一致
document_id => "%{id}"
# 这个必须要写不然不会同步
document_type => "_doc"
}}
stdout {
#以JSON格式输出
codec => json_lines
}
2.2 Canal
下载canal1.1.5
8.0以下版本均要开启mysql的binlog写入功能,在mysql中使用使用命令show variables like ‘%log_bin%’看是否修改成功
#开启binlog模式
log_bin=mysql-bin
binlog-format=row
#single DB binlog-ignore-db=mysql
在Canal.deployer目录下找到配置文件conf/example/instance.properties,修改如下字段
# 同步数据库地址
canal.instance.master.address=你的mysql数据库地址
# 同步数据库账号密码
canal.instance.dbUsername=你的mysql账号
canal.instance.dbPassword=你的mysql密码
修改完成后,保存,启动bin目录下面的startup.bat(linux启动startup.sh),显示Listening for transport dt_socket at address: 9099则没错
在canal.adapter目录下找到配置文件conf/application.yml,修改以下配置:
srcDataSources:
defaultDS:
url: 你自己的mysql数据库地址
username: mysql数据库账号
password: mysql数据库密码
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7 #ES同步适配器
key: es7Key
hosts: #ES连接地址,!!!!!!主要这里有的地方要加http:
properties:
mode: rest
在conf/es7目录中,新建与sql映射表名一样的yml文件
outerAdapterKey: es7Key # 这里和主配置canalAdapters.instance.groupId.outerAdapters.key下面的的key一样
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example
groupId: g1
esMapping:
_index: #ES的索引名称
_id: _id
sql: "" # sql映射
etlCondition: "where a.c_time>={}"
***mitBatch: 3000 #提交批大小
注意:
如果报错,可以先看日志,如果日志没有报错,并且监听到了mysql数据的改变,但是没有同步到es,可能是由于es7.x的版本问题,解决办法如下:
将canal 1.1.5.alpha-2中plugins目录下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar复制到1.1.5的plugins下
更多报错参考:canalAdapter同步至es7.x踩坑指南_client-adapter.es7x-1.1.5-snapshot-jar-with-depend-CSDN博客
三、总结
logstash使用相对比较简单,易上手,可扩展插件生态系统,支持定时存储,但是它耗资源较大,并且不能与消息队列缓存。canal实时同步存储,查询更快、更迅速,对数据库无压力,但是相对而言难些。