数据库-数据同步canal

Canal是什么?

Canal是阿里巴巴的基于mysql数据库binlog的增量订阅&消费组件

官网 :

https://github.com/alibaba/canal

以服务端视角去看canal都做了什么事情 (by canal-1.1.3版本)

canal主要有三个应用角色 [mysql, canal-server, canal-client]

它们之间关系 [mysql -> canal-server(deployer)生产 -> canal-client(adapter)消费]

     *      
     *      一.获取配置文件并启动服务器 
     *         1.1.CanalLauncher.main 
     *              CanalStater.start
     *                -> CanalController.initInstanceConfig -> SpringCanalInstanceGenerator.generate 
     *                   -> CanalInstanceWithManager.initEventParser -> CanalInstanceWithManager.doInitEventParser 
     *                      -> CanalServerWithEmbedded.start 
     *                          -> for(CanalInstanceWithManager.start(destination))
     *                              metaManager.start() 
     *                              alarmHandler.start() 
     *                              eventStore.start() 
     *                              eventSink.start()
     *                              eventParser.start()
     *
     *          注: 一个destination对应一个CanalInstanceWithManager实例
     *                      CanalController.initInstanceConfig()会逗号分割配置文件destinations参数, 放到系统变量,
     *                      canal配置spring的SYSTEM_PROPERTIES_MODE_OVERRIDE参数, 会让spring会优先取系统变量,
     *                      然后让spring自动注入参数到AbstractEventParser类的destination属性里
     *                      
     *       二.与mysql建立连接
     *         2.1.CanalEventParser(一个destination对应一个CanalEventParser)
     *          默认实现类是RdsBinlogEventParserProxy, 可以在base-instance.xml中配置
     *          CanalEventParser有以下几个实现
     *              MysqlEventParser (帐号密码远程连接)
     *              LocalBinlogEventParser (本地binlog文件的复制)
     *              RdsBinlogEventParserProxy (基于阿里云rds binlog备份文件的复制, 会自动下载阿里云的oss binlog文件到临时目录并读取同步)
     *              
     *       三.dump数据 用单线程BIO读binlog数据
     *         3.1.AbstractEventParser.parseThread.start(), 堵塞读, 会读到心跳sql in.read() 转成Event对象
     *              ErosaConnection.dump -> MultiStageCoprocessor.publish
     *
     *              MultiStageCoprocessor.class介绍: 它是针对解析器提供一个多阶段协同的处理(用disruptor协调多线程)
     *                  1.SimpleParserStage(事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息))
     *                  2.DmlParserStage(事件深度解析 (多线程, DML事件数据的完整解析))
     *                  3.SinkStoreStage(投递到store (单线程)
     *                          -> EventTransactionBuffer.add(CanalEntry.Entry)
     *                               -> this.put()
     *                                   -> TransactionFlushCallback.flush()
     *                                      -> AbstractEventParser.consumeTheEventAndProfilingIfNecessary
     *                                          -> CanalEventSink.sink() 提交Event数据
     *
     *           注: mysql与canal-server的心跳任务 MysqlDetectingTimeTask(实现断线重连, 主备切换(master断线重连超过默认3次后,自动切换备standby)
     *
     *        四.事件消费(服务端消费)
     *         4.1.CanalEventSink
     *              1. EntryEventSink.doSink() ->
     *              2. 拦截器CanalEventDownStreamHandler
     *                      -多库归并排序Event对象(HA 多group的实现, 需要使用group-instance.xml),
     *                      -普罗米修斯监控,
     *                      -过滤mysql心跳sql事件(会有定时任务执行sql语句保持心跳MysqlEventParser.buildHeartBeatTimeTask)
     *                              ->  EntryEventSink.tryPut()提交到内存里 )
     *
     *       五.事件存储(目前canal-1.1.3版本只有内存存储)
     *         5.1.CanalEventStore(负责存储Event类, 也就是平时操作的DML -> tryPut,这时候mysql与canal的流程已经完成了 )
     *
     *         注: binlog事件数据在mysql上持久化,所以canal不需要再次持久化. 只需要持久化记录点pos即可
     *
     *       六.提供外部访问接口 (连接,订阅,消费, 认证, 获取数据, 事物(ack|offset 模式))
     *         6.1.CanalServer (实现类CanalServerWithNetty 开放端口供客户端拉数据)
     *              下面是服务端与客户端的交互流程
     *
     *              客户端发起连接..
     *                  握手协议
     *              客户端订阅...
     *                  订阅一个destination
     *              客户端主动拉..
     *                  500/ms主动拉一次,请求参数[拉取数量] -> SessionHandler.GET -> getWithoutAck() 服务端获取最后一次记录点pos, 查询事件包, 并记录拉流的pos, 新建一个batchId, 服务端返回batchId, 如果是-1表示没有拉到数据
     *              客户端处理..
     *                  没有超时时间, 自己慢慢处理, 只是完成后记得告诉服务端提交事物
     *              客户端事物提交..
     *                  本batch正常完成 -> SessionHandler.CLIENTACK -> 服务端会删除batchId, 然后更新当前destination的记录点pos, 然后清理事件存储的数据CanalEventStore
     *              客户端事物异常..
     *                  回滚指定batchId的记录点位置 -> SessionHandler.CLIENTROLLBACK -> 服务端删除batch信息
     *              客户端取消订阅
     *                  简单模式暂无触发 -> SessionHandler.UNSUBSCRIPTION -> 服务端删除destination下的所有事物信息
     *              客户端断开连接
     *                  客户端作为主动方断开,进入TIME_WAIT状态. -> 客户端调用channel.close()直接关闭
     *
     *              注: CanalServerWithEmbedded中有一个CanalMetaManager实现了以下操作与存储(记录点pos,订阅信息,客户端信息clientId)
     *                       CanalMetaManager的实现有 [1.内存+zookeeper  2.内存+本地文件]
     *
     *
     *
     *
     *
     * 以客户端视角去看canal都做了什么事情?
     *          1. while(true) getWithoutAck(1000条) -> process(Dml)
     *
     *      客户端实现了什么?
     *          1. 实现了3种断线重连(单点 , HA-zookeeper, HA-配置文件)
     *          2. 实现了热修改配置文件 (本地文件扫描-定时任务, 远程DB-定时任务)
     *

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

备案信息公示
京ICP备18003381号
京ICP备18003381号-1