Canal是什么?
Canal是阿里巴巴的基于mysql数据库binlog的增量订阅&消费组件
官网 :
https://github.com/alibaba/canal
以服务端视角去看canal都做了什么事情 (by canal-1.1.3版本)
canal主要有三个应用角色 [mysql, canal-server, canal-client]
它们之间关系 [mysql -> canal-server(deployer)生产 -> canal-client(adapter)消费]
*
* 一.获取配置文件并启动服务器
* 1.1.CanalLauncher.main
* CanalStater.start
* -> CanalController.initInstanceConfig -> SpringCanalInstanceGenerator.generate
* -> CanalInstanceWithManager.initEventParser -> CanalInstanceWithManager.doInitEventParser
* -> CanalServerWithEmbedded.start
* -> for(CanalInstanceWithManager.start(destination))
* metaManager.start()
* alarmHandler.start()
* eventStore.start()
* eventSink.start()
* eventParser.start()
*
* 注: 一个destination对应一个CanalInstanceWithManager实例
* CanalController.initInstanceConfig()会逗号分割配置文件destinations参数, 放到系统变量,
* canal配置spring的SYSTEM_PROPERTIES_MODE_OVERRIDE参数, 会让spring会优先取系统变量,
* 然后让spring自动注入参数到AbstractEventParser类的destination属性里
*
* 二.与mysql建立连接
* 2.1.CanalEventParser(一个destination对应一个CanalEventParser)
* 默认实现类是RdsBinlogEventParserProxy, 可以在base-instance.xml中配置
* CanalEventParser有以下几个实现
* MysqlEventParser (帐号密码远程连接)
* LocalBinlogEventParser (本地binlog文件的复制)
* RdsBinlogEventParserProxy (基于阿里云rds binlog备份文件的复制, 会自动下载阿里云的oss binlog文件到临时目录并读取同步)
*
* 三.dump数据 用单线程BIO读binlog数据
* 3.1.AbstractEventParser.parseThread.start(), 堵塞读, 会读到心跳sql in.read() 转成Event对象
* ErosaConnection.dump -> MultiStageCoprocessor.publish
*
* MultiStageCoprocessor.class介绍: 它是针对解析器提供一个多阶段协同的处理(用disruptor协调多线程)
* 1.SimpleParserStage(事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息))
* 2.DmlParserStage(事件深度解析 (多线程, DML事件数据的完整解析))
* 3.SinkStoreStage(投递到store (单线程)
* -> EventTransactionBuffer.add(CanalEntry.Entry)
* -> this.put()
* -> TransactionFlushCallback.flush()
* -> AbstractEventParser.consumeTheEventAndProfilingIfNecessary
* -> CanalEventSink.sink() 提交Event数据
*
* 注: mysql与canal-server的心跳任务 MysqlDetectingTimeTask(实现断线重连, 主备切换(master断线重连超过默认3次后,自动切换备standby)
*
* 四.事件消费(服务端消费)
* 4.1.CanalEventSink
* 1. EntryEventSink.doSink() ->
* 2. 拦截器CanalEventDownStreamHandler
* -多库归并排序Event对象(HA 多group的实现, 需要使用group-instance.xml),
* -普罗米修斯监控,
* -过滤mysql心跳sql事件(会有定时任务执行sql语句保持心跳MysqlEventParser.buildHeartBeatTimeTask)
* -> EntryEventSink.tryPut()提交到内存里 )
*
* 五.事件存储(目前canal-1.1.3版本只有内存存储)
* 5.1.CanalEventStore(负责存储Event类, 也就是平时操作的DML -> tryPut,这时候mysql与canal的流程已经完成了 )
*
* 注: binlog事件数据在mysql上持久化,所以canal不需要再次持久化. 只需要持久化记录点pos即可
*
* 六.提供外部访问接口 (连接,订阅,消费, 认证, 获取数据, 事物(ack|offset 模式))
* 6.1.CanalServer (实现类CanalServerWithNetty 开放端口供客户端拉数据)
* 下面是服务端与客户端的交互流程
*
* 客户端发起连接..
* 握手协议
* 客户端订阅...
* 订阅一个destination
* 客户端主动拉..
* 500/ms主动拉一次,请求参数[拉取数量] -> SessionHandler.GET -> getWithoutAck() 服务端获取最后一次记录点pos, 查询事件包, 并记录拉流的pos, 新建一个batchId, 服务端返回batchId, 如果是-1表示没有拉到数据
* 客户端处理..
* 没有超时时间, 自己慢慢处理, 只是完成后记得告诉服务端提交事物
* 客户端事物提交..
* 本batch正常完成 -> SessionHandler.CLIENTACK -> 服务端会删除batchId, 然后更新当前destination的记录点pos, 然后清理事件存储的数据CanalEventStore
* 客户端事物异常..
* 回滚指定batchId的记录点位置 -> SessionHandler.CLIENTROLLBACK -> 服务端删除batch信息
* 客户端取消订阅
* 简单模式暂无触发 -> SessionHandler.UNSUBSCRIPTION -> 服务端删除destination下的所有事物信息
* 客户端断开连接
* 客户端作为主动方断开,进入TIME_WAIT状态. -> 客户端调用channel.close()直接关闭
*
* 注: CanalServerWithEmbedded中有一个CanalMetaManager实现了以下操作与存储(记录点pos,订阅信息,客户端信息clientId)
* CanalMetaManager的实现有 [1.内存+zookeeper 2.内存+本地文件]
*
*
*
*
*
* 以客户端视角去看canal都做了什么事情?
* 1. while(true) getWithoutAck(1000条) -> process(Dml)
*
* 客户端实现了什么?
* 1. 实现了3种断线重连(单点 , HA-zookeeper, HA-配置文件)
* 2. 实现了热修改配置文件 (本地文件扫描-定时任务, 远程DB-定时任务)
*