Canal的介绍
Canal的历史由来
在早期的时候,阿里巴巴公司因为杭州和美国两个地方的机房都部署了数据库实例,但因为跨机房同步数据的业务需求 ,便孕育而生出了Canal,主要是基于trigger(触发器)的方式获取增量变更。从2010年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变更的数据进行同步,由此衍生出了增量订阅和消费业务。
当前的Canal支持的数据源端MySQL版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x。
Canal的应用场景
目前普遍基于日志增量订阅和消费的业务,主要包括:
在介绍Canal的原理之前,我们先来了解下MySQL主从复制的原理。
MySQL主从复制原理
了解了MySQL的工作原理,我们可以大致猜想到Canal应该也是采用类似的逻辑去实现增量数据订阅的功能,那么接下来我们看看实际上Canal的工作原理是怎样的?
Canal工作原理
基于这样的原理与方式,便可以完成数据库增量日志的获取解析,提供增量数据订阅和消费,实现MySQL实时增量数据传输的功能。
既然Canal是这样的一个框架,又是纯Java语言编写而成,那么我们接下来就开始学习怎么使用它并把它用到我们的实际工作中。
Canal的Docker环境准备
因为目前容器化技术的火热,本文通过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在我们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。由于本篇主要讲解Canal,所以关于Docker的内容不会涉及太多,主要会介绍Docker的基本概念和命令使用。 如果你想和更多容器技术专家交流,可以加我微信liyingjiese,备注『加群』。群里每周都有全球各大公司的最佳实践以及行业最新动态 。
什么是Docker
相信绝大多数人都使用过虚拟机VMware,在使用VMware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置还是如我们在本机操作一样在虚拟机里也操作一遍,而且VMware占用宿主机的资源较多,容易造成宿主机卡顿,而且系统镜像本身也占用过多空间。
为了便于大家快速理解Docker,便与VMware做对比来做介绍,Docker提供了一个开始,打包,运行APP的平台,把APP(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概念就是镜像(类似VMware的系统镜像)与容器(类似VMware里安装的系统)。
什么是Image(镜像)
什么是Container(容器)
Docker的网络介绍
Docker的网络类型有三种:
创建自定义网络:(设置固定IP)
docker network create --subnet=172.18.0.0/16 mynetwork
查看存在的网络类型docker network ls:
搭建Canal环境
附上Docker的下载安装地址==> Docker Download 。
下载Canal镜像docker pull canal/canal-server
:
下载MySQL镜像docker pull mysql
,下载过的则如下图:
查看已经下载好的镜像docker images:
接下来通过镜像生成MySQL容器与canal-server容器:
##生成mysql容器 docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql ##生成canal-server容器 docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server ## 命令介绍 --net mynetwork #使用自定义网络 --ip #指定分配ip
查看Docker中运行的容器docker ps:
MySQL的配置修改
以上只是初步准备好了基础的环境,但是怎么让Canal伪装成Salve并正确获取MySQL中的binary log呢?
对于自建MySQL,需要先开启Binlog写入功能,配置binlog-format
为ROW模式,通过修改MySQL配置文件来开启bin_log,使用find / -name my.cnf
查找my.cnf,修改文件内容如下:
[mysqld] log-bin=mysql-bin # 开启binlog binlog-format=ROW # 选择ROW模式 server_id=1 # 配置MySQL replaction需要定义,不要和Canal的slaveId重复
进入MySQL容器docker exec -it mysql bash。
创建链接MySQL的账号Canal并授予作为MySQL slave的权限,如果已有账户可直接GRANT:
mysql -uroot -proot # 创建账号 CREATE USER canal IDENTIFIED BY 'canal'; # 授予权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; # 刷新并应用 FLUSH PRIVILEGES;
数据库重启后,简单测试 my.cnf 配置是否生效:
show variables like 'log_bin'; show variables like 'log_bin'; show master status;
canal-server的配置修改
进入canal-server容器docker exec -it canal-server bash
。
编辑canal-server的配置vi canal-server/conf/example/instance.properties
:
更多配置请参考==>Canal配置说明 。
重启canal-server容器docker restart canal-server
进入容器查看启动日志:
docker exec -it canal-server bash tail -100f canal-server/logs/example/example.log
至此,我们的环境工作准备完成!
拉取数据并同步保存到ElasticSearch
本文的ElasticSearch也是基于Docker环境搭建,所以读者可执行如下命令:
# 下载对镜像 docker pull elasticsearch:7.1.1 docker pull mobz/elasticsearch-head:5-alpine # 创建容器并运行 docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1 docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取Canal解析后的binlog数据。首先我们基于Spring Boot搭建一个canal demo应用。结构如下图所示:
Student.java
package com.example.canal.study.pojo; import lombok.Data; import java.io.Serializable; // @Data 用户生产getter、setter方法 @Data public class Student implements Serializable { private String id; private String name; private int age; private String sex; private String city; }
CanalConfig.java
package com.example.canal.study.common; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; /** * @author haha */ @Configuration public class CanalConfig { // @Value 获取 application.properties配置中端内容 @Value("${canal.server.ip}") private String canalIp; @Value("${canal.server.port}") private Integer canalPort; @Value("${canal.destination}") private String destination; @Value("${elasticSearch.server.ip}") private String elasticSearchIp; @Value("${elasticSearch.server.port}") private Integer elasticSearchPort; @Value("${zookeeper.server.ip}") private String zkServerIp; // 获取简单canal-server连接 @Bean public CanalConnector canalSimpleConnector() { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", ""); return canalConnector; } // 通过连接zookeeper获取canal-server连接 @Bean public CanalConnector canalHaConnector() { CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", ""); return canalConnector; } // elasticsearch 7.x客户端 @Bean public RestHighLevelClient restHighLevelClient() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort)) ); return client; } }
CanalDataParser.java
由于这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从GitHub上获取:
ElasticUtils.java
package com.example.canal.study.common; import com.alibaba.fastjson.JSON; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.Map; /** * @author haha */ @Slf4j @Component public class ElasticUtils { @Autowired private RestHighLevelClient restHighLevelClient; /** * 新增 * @param student * @param index 索引 */ public void saveEs(Student student, String index) { IndexRequest indexRequest = new IndexRequest(index) .id(student.getId()) .source(JSON.toJSONString(student), XContentType.JSON) .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("保存数据至ElasticSearch成功:{}", response.getId()); } catch (IOException e) { log.error("保存数据至elasticSearch失败: {}", e); } } /** * 查看 * @param index 索引 * @param id _id * @throws IOException */ public void getEs(String index, String id) throws IOException { GetRequest getRequest = new GetRequest(index, id); GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Mapfields = response.getSource(); for (Map.Entry entry : fields.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue()); } } /** * 更新 * @param student * @param index 索引 * @throws IOException */ public void updateEs(Student student, String index) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, student.getId()); updateRequest.upsert(JSON.toJSONString(student), XContentType.JSON); UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("更新数据至ElasticSearch成功:{}", response.getId()); } /** * 根据id删除数据 * @param index 索引 * @param id _id * @throws IOException */ public void DeleteEs(String index, String id) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); log.info("删除数据至ElasticSearch成功:{}", response.getId()); } }
BinLogElasticSearch.java
package com.example.canal.study.action; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.example.canal.study.common.CanalDataParser; import com.example.canal.study.common.ElasticUtils; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.Map; /** * @author haha */ @Slf4j @Component public class BinLogElasticSearch { @Autowired private CanalConnector canalSimpleConnector; @Autowired private ElasticUtils elasticUtils; //@Qualifier("canalHaConnector")使用名为canalHaConnector的bean @Autowired @Qualifier("canalHaConnector") private CanalConnector canalHaConnector; public void binLogToElasticSearch() throws IOException { openCanalConnector(canalHaConnector); // 轮询拉取数据 Integer batchSize = 5 * 1024; while (true) { Message message = canalHaConnector.getWithoutAck(batchSize); // Message message = canalSimpleConnector.getWithoutAck(batchSize); long id = message.getId(); int size = message.getEntries().size(); log.info("当前监控到binLog消息数量{}", size); if (id == -1 || size == 0) { try { // 等待2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { //1. 解析message对象 Listentries = message.getEntries(); List > rows = CanalDataParser.printEntry(entries); for (CanalDataParser.TwoTuple tuple : rows) { if(tuple.eventType == CanalEntry.EventType.INSERT) { Student student = createStudent(tuple); // 2。将解析出的对象同步到elasticSearch中 elasticUtils.saveEs(student, "student_index"); // 3.消息确认已处理 // canalSimpleConnector.ack(id); canalHaConnector.ack(id); } if(tuple.eventType == CanalEntry.EventType.UPDATE){ Student student = createStudent(tuple); elasticUtils.updateEs(student, "student_index"); // 3.消息确认已处理 // canalSimpleConnector.ack(id); canalHaConnector.ack(id); } if(tuple.eventType == CanalEntry.EventType.DELETE){ elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString()); canalHaConnector.ack(id); } } } } } /** * 封装数据至Student * @param tuple * @return */ private Student createStudent(CanalDataParser.TwoTuple tuple){ Student student = new Student(); student.setId(tuple.columnMap.get("id").toString()); student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString())); student.setName(tuple.columnMap.get("name").toString()); student.setSex(tuple.columnMap.get("sex").toString()); student.setCity(tuple.columnMap.get("city").toString()); return student; } /** * 打开canal连接 * * @param canalConnector */ private void openCanalConnector(CanalConnector canalConnector) { //连接CanalServer canalConnector.connect(); // 订阅destination canalConnector.subscribe(); } /** * 关闭canal连接 * * @param canalConnector */ private void closeCanalConnector(CanalConnector canalConnector) { //关闭连接CanalServer canalConnector.disconnect(); // 注销订阅destination canalConnector.unsubscribe(); } }
CanalDemoApplication.java(Spring Boot启动类)
package com.example.canal.study; import com.example.canal.study.action.BinLogElasticSearch; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author haha */ @SpringBootApplication public class CanalDemoApplication implements ApplicationRunner { @Autowired private BinLogElasticSearch binLogElasticSearch; public static void main(String[] args) { SpringApplication.run(CanalDemoApplication.class, args); } // 程序启动则执行run方法 @Override public void run(ApplicationArguments args) throws Exception { binLogElasticSearch.binLogToElasticSearch(); } }
application.properties
server.port=8081 spring.application.name = canal-demo canal.server.ip = 192.168.124.5 canal.server.port = 11111 canal.destination = example zookeeper.server.ip = 192.168.124.5:2181 zookeeper.sasl.client = false elasticSearch.server.ip = 192.168.124.5 elasticSearch.server.port = 9200
Canal集群高可用的搭建
通过上面的学习,我们知道了单机直连方式的Canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么Canala的多实例集群方式如何搭建呢!
基于ZooKeeper获取Canal实例
准备ZooKeeper的Docker镜像与容器:
docker pull zookeeper docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server
1、机器准备:
2、按照部署和配置,在单台机器上各自完成配置,演示时instance name为example。
3、修改canal.properties,加上ZooKeeper配置并修改Canal端口:
canal.port=11113 canal.zkServers=172.18.0.3:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml
4、创建example目录,并修改instance.properties:
canal.instance.mysql.slaveId = 1235 #之前的canal slaveId是1234,保证slaveId不重复即可 canal.instance.master.address = 172.18.0.6:3306
注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml
配置。
启动两个不同容器的Canal,启动后,可以通过tail -100f logs/example/example.log
查看启动日志,只会看到一台机器上出现了启动成功的日志。
比如我这里启动成功的是 172.18.0.4:
查看一下ZooKeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111:
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
客户端链接, 消费数据
可以通过指定ZooKeeper地址和Canal的instance name,canal client会自动从ZooKeeper中的running节点获取当前服务的工作节点,然后与其建立链接:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
对应的客户端编码可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一个HA连接:
CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");
链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端IP,链接的端口信息等(聪明的你,应该也可以发现,canal client也可以支持HA功能):
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running {"active":true,"address":"192.168.124.5:59887","clientId":1001}
数据消费成功后,canal server会在ZooKeeper中记录下当前最后一次消费成功的binlog位点(下次你重启client时,会从这最后一个位点继续进行消费):
[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}
停止正在工作的172.18.0.4的canal server:
docker exec -it canal-server bash cd canal-server/bin sh stop.sh
这时172.18.0.8会立马启动example instance,提供新的数据服务:
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.8:11111","cid":1}
与此同时,客户端也会随着canal server的切换,通过获取ZooKeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成。
异常与总结
elasticsearch-head
无法访问Elasticsearch
es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。所以我们需要修改es的配置文件,增加一些配置项来解决这个问题,如下:
[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/ [root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml # 文件末尾加上如下配置 http.cors.enabled: true http.cors.allow-origin: "*"
修改完配置文件后需重启es服务。
elasticsearch-head查询报406 Not Acceptable
解决方法:
1、进入head安装目录;
2、cd _site/
3、编辑vendor.js 共有两处
#6886行 contentType: "application/x-www-form-urlencoded 改成 contentType: "application/json;charset=UTF-8" #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" && 改成 var inspectData = s.contentType === "application/json;charset=UTF-8" &&
使用elasticsearch-rest-high-level-client
报org.elasticsearch.action.index.IndexRequest.ifSeqNo
#pom中除了加入依赖#还需加入 org.elasticsearch.client elasticsearch-rest-high-level-client 7.1.1 org.elasticsearch elasticsearch 7.1.1
相关参考: git hub issues 。
为什么ElasticSearch要在7.X版本不能使用type?
参考: 为什么ElasticSearch要在7.X版本去掉type?
使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.NoNodeAvailableException
由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方TransportClient,而es官方计划放弃TransportClient,工具以es官方推荐的RestHighLevelClient进行调用请求。 可参考 RestHighLevelClient API 。
设置Docker容器开启启动
如果创建时未指定 --restart=always ,可通过update 命令 docker update --restart=always [containerID]
Docker for Mac network host模式不生效
Host模式是为了性能,但是这却对Docker的隔离性造成了破坏,导致安全性降低。 在性能场景下,可以用--netwokr host开启Host模式,但需要注意的是,如果你用Windows或Mac本地启动容器的话,会遇到Host模式失效的问题。原因是Host模式只支持Linux宿主机。
参见官方文档: https://docs.docker.com/network/host/ 。
客户端连接ZooKeeper报authenticate using SASL(unknow error)
出现这个错的意思是ZooKeeper作为外部应用需要向系统申请资源,申请资源的时候需要通过认证,而sasl是一种认证方式,我们想办法来绕过sasl认证。避免等待,来提高效率。
在项目代码中加入System.setProperty("zookeeper.sasl.client", "false");,
如果是Spring Boot项目可以在application.properties
中加入zookeeper.sasl.client=false
。
参考: Increased CPU usage by unnecessary SASL checks 。
如果更换canal.client.jar中依赖的zookeeper.jar的版本
把Canal的官方源码下载到本机git clone https://github.com/alibaba/canal.git ,然后修改client模块下pom.xml文件中关于ZooKeeper的内容,然后重新mvn install:
把自己项目依赖的包替换为刚刚mvn install
生产的包:
关于选型的取舍
总结
以上所述是小编给大家介绍的基于Docker结合Canal实现MySQL实时增量数据传输功能,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!