【Canal源码分析】Sink及Store工作过程

一、序列图

image

二、源码分析

2.1 Sink

Sink阶段所做的事情,就是根据一定的规则,对binlog数据进行一定的过滤。我们之前跟踪过parser过程的代码,发现在parser完成后,会把数据放到一个环形队列TransactionBuffer中,也就是这个方法:

transactionBuffer.add(entry);

我们具体看下add这个方法。

public void add(CanalEntry.Entry entry) throws InterruptedException {switch (entry.getEntryType()) {case TRANSACTIONBEGIN:flush();// 刷新上一次的数据put(entry);break;case TRANSACTIONEND:put(entry);flush();break;case ROWDATA:put(entry);// 针对非DML的数据,直接输出,不进行buffer控制EventType eventType = entry.getHeader().getEventType();if (eventType != null && !isDml(eventType)) {flush();}break;default:break;}
}

判断一下事件的类型,如果是事务开头,那么直接刷新之前的数据,然后把当前事件加到队列中;如果是事务的结束,那么先把当前事务放到队列后,刷新到下一个阶段;如果是普通的事件,直接放到队列中,如果事务头类型不为空,且不是DML类型,那么直接刷新队列中数据到下一个阶段。

我们需要理清楚这块的逻辑,什么时候flush,什么时候put,针对不同的事件,采取的策略不一样。

这里我们分析下flush和put两个步骤。

2.1.1 flush队列

这块其实还没有涉及到sink阶段,还在维护一个事件环形队列。这个环形队列,维护了两个指针,一个是flush的指针,一个是put的指针,flush的指针永远是滞后于put指针的。

private void flush() throws InterruptedException {long start = this.flushSequence.get() + 1;long end = this.putSequence.get();if (start <= end) {List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();for (long next = start; next <= end; next++) {transaction.add(this.entries[getIndex(next)]);}flushCallback.flush(transaction);flushSequence.set(end);// flush成功后,更新flush位置}
}

start就是flush的指针,end就是put的指针,flush的动作就是把当前flush到put中间的数据,全部刷新到下一个阶段。具体传递到下一个阶段的代码在flushCallback.flush方法中。这块我们下文再分析。

2.1.2 put

private void put(CanalEntry.Entry data) throws InterruptedException {// 首先检查是否有空位if (checkFreeSlotAt(putSequence.get() + 1)) {long current = putSequence.get();long next = current + 1;// 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值entries[getIndex(next)] = data;putSequence.set(next);} else {flush();// buffer区满了,刷新一下put(data);// 继续加一下新数据}
}

这块的注释都比较清晰了,就不赘述了。

2.1.3 flush到sink

具体的代码在AbstractEventParser中,定义transactionBuffer的地方。

public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);if (!running) {return;}if (!successed) {throw new CanalParseException("consume failed!");}LogPosition position = buildLastTransactionPosition(transaction);if (position != null) { // 可能position为空logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);}
}

主要的处理在consumeTheEventAndProfilingIfNecessary里面。这里面调用了eventSink.sink()方法。

2.1.4 sink

这里面进行了binlog数据的过滤。首先判断是否需要过滤事务头和尾,如果需要过滤的话,直接过滤掉,默认不过滤。

遍历传到这个阶段的binlog列表,根据正则表达式判断,是否需要进行过滤,一般来说是根据表名、库名等进行过滤。这边的过滤类主要是AviaterRegexFilter,根据库名.表名和表达式进行过滤。如果需要进行过滤,那么直接把这个事件过滤。否则,加到binlog列表中,进行二次过滤。第二次过滤的主要内容是HEARTBEAT类型的事件,主要的代码在这里:

protected boolean doSink(List<Event> events) {for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.before(events);//处理heartbeat事件}int fullTimes = 0;do {if (eventStore.tryPut(events)) {for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.after(events);}return true;} else {applyWait(++fullTimes);}for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.retry(events);}} while (running && !Thread.interrupted());return false;
}

这里的CanalEventDownStreamHandler其实只有HeartBeatEntryEventHandler,也就是在before方法中把heartbeat事件从events去掉。这个心跳事件其实是parser过程生成的,我们之前有提到过。after目前是空的方法。

去掉之后,剩余的事件列表就会被调用tryPut()方法,送到下一步骤store中。

这里还有个applyWait方法,防止无限等待。

private void applyWait(int fullTimes) {int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;if (fullTimes <= 3) { // 3次以内Thread.yield();} else { // 超过3次,最多只sleep 10msLockSupport.parkNanos(1000 * 1000L * newFullTimes);}}

2.2 Store

目前只有基于内存模式的Store,这个阶段是真正Server中的落盘阶段。数据经历了mysql master到parser,再到sink,最后终于到了这里。

public boolean tryPut(List<Event> data) throws CanalStoreException {if (data == null || data.isEmpty()) {return true;}final ReentrantLock lock = this.lock;lock.lock();try {if (!checkFreeSlotAt(putSequence.get() + data.size())) {return false;} else {doPut(data);return true;}} finally {lock.unlock();}
}

在进行数据put的时候,加了一把锁。首先计算下是否还有剩余的空间进行数据处理,这里的计算,不光是计算了队列的剩余长度,还计算了剩余空间。队列的长度默认是16*1024,如果空间不足,直接拒绝,返回false,等待空间空余出来后,再进行put操作。否则,直接doPut()。

/*** 执行具体的put操作*/
private void doPut(List<Event> data) {long current = putSequence.get();long end = current + data.size();// 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值for (long next = current + 1; next <= end; next++) {entries[getIndex(next)] = data.get((int) (next - current - 1));}putSequence.set(end);// 记录一下gets memsize信息,方便快速检索if (batchMode.isMemSize()) {long size = 0;for (Event event : data) {size += calculateSize(event);}putMemSize.getAndAdd(size);}// tell other threads that store is not emptynotEmpty.signal();
}

这里主要对put一些指针,还有空间做了重新的计算。放到队列中之后,通知其他等待notEmpty的线程,来进行数据的获取,这时候,client可以进行数据获取了。

转载于:https://www.cnblogs.com/f-zhao/p/9088655.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/267028.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

传Facebook将推出应用中心挑战谷歌搜索地位

网易科技讯 6月8日消息&#xff0c;据TechCrunch报道&#xff0c;Facebook将在今天或不久后推出App Center(应用中心)产品。应用中心可在手机上或浏览器中使用&#xff0c;外观和功能非常像苹果的应用店App Store&#xff0c;除了不能下载iPhone或iPad应用外。 Facebook的应用中…

java中or和and的优先级_x86处理器汇编语言AND和OR运算符优先级

Irvine的书使用MASM作为参考汇编程序 .作者正在谈论MASM operators 1 - 这些运营商仅为了我们人类的利益而受到支持 .它们让我们对立即数和常量执行算术&#xff0c;但它们使用的表达式必须最终在汇编时解析为一个值 .aConstant EQU 35mov edx, NOT 1 ;Same as mov edx, 0fffff…

博客园修改页面显示样式css

博客园修改页面显示样式css 一、总结 二、 博客园修改页面显示样式css 1、点管理 2、点设置 3、在页面定制css代码中加入你想要改变样式的css代码即可 不会写css的下面有代码示例&#xff0c;直接复制粘贴放到页面定制css代码位置即可 三、代码 1 #cnblogs_post_body h3 {2 …

基准测试 ApacheBench ab学习

2019独角兽企业重金招聘Python工程师标准>>> ab的全称是ApacheBench&#xff0c;是 Apache 附带的一个小工具&#xff0c;专门用于 HTTP Server 的benchmark testing&#xff0c;可以同时模拟多个并发请求。前段时间看到公司的开发人员也在用它作一些测试&#xff0…

java虚拟机类加载机制浅谈_浅谈Java虚拟机(三)之类加载机制

在《浅谈Java虚拟机》这篇文章中&#xff0c;我们提到了JVM从操作系统方面来说&#xff0c;可以将其看做是一个进程&#xff0c;分别有类加载器子系统&#xff0c;执行引擎子系统和垃圾收集子系统。这一篇文章就简单的来谈一下类加载器子系统中的类加载机制。第一&#xff1a;什…

如何把两个查询语句合成一条 语句

我给你写个例子&#xff1a;假设 第一条sql 是 select a.a1,a.a2,a.a3 from A a where ... 第二条sql是 select b.b1,b.b2,b.b3 from B b where ... 第三条sql 是 select c.c1,c.c2 from C c where ...那么合成一句的sql 是select x.x1,x.x2…

fedora 16 面部显示

为什么80%的码农都做不了架构师&#xff1f;>>> 显示日期&#xff1a; gsettings set org.gnome.shell.clock show-date true 显示秒数&#xff1a; gsettings set org.gnome.shell.clock show-seconds true 显示天气&#xff1a; 1.在这里 https://github.com/sim…

c#入门笔记

c#入门初探 零. 写在前面 0.1 解决方案、项目、程序集、命名空间 0.1.1项目 一个项目可以就是你开发的一个软件。在.Net下&#xff0c;一个项目可以表现为多种类型&#xff0c;如控制台应用程序&#xff0c;Windows应用程序&#xff0c;类库&#xff08;Class Library&#xff…

python 日常小技巧

python 访问win32程序和指定地址程序 1 import subprocess 2 psubprocess.Popen("calc.exe",0,None,None,None,None) 3 p.wait() 4 psubprocess.Popen("D:\Program Files\Tencent\QQ\Bin\QQ.exe",0,None,None,None,None) 5 p.wait() 6 7 import os 8 os.…

《JavaScript设计模式与开发实践》——第3章 闭包和高阶函数

闭包 变量的作用域和生存周期密切相关 高阶函数 函数可以作为参数被传递 函数可以作为返回值输出 转载于:https://www.cnblogs.com/-beauTiFul/p/9092459.html

java 编辑pdf表格_Java 生成pdf表格文档

最近在工作做一个泰国的项目&#xff0c;应供应商要求&#xff0c;需要将每天的交易生成pdf格式的报表上传到供应商的服务器&#xff0c;特此记录实现方法。废话不多说&#xff0c;直接上代码&#xff1a;THSarabunNew.ttf该文件是泰国字体自行网上下载即可import com.itextpdf…

同时在一个WebService服务中发布多个普通Java类

packageservice;publicclassMyService{ publicString getGreeting(String name){ return"您好 "name; } publicvoidupdate(String data){ System.out.println("<"data ">已经更新"); } } package service…

博客作业05--查找

1.学习总结 1.1查找的思维导图 1.2 查找学习体会 1、map简介 map是一类关联式容器。它的特点是增加和删除节点对迭代器的影响很小&#xff0c;除了那个操作节点&#xff0c;对其他的节点都没有什么影响。 对于迭代器来说&#xff0c;可以修改实值&#xff0c;而不能修改key。2、…

python 列表 mysql in_关于mysql:内嵌要在python MySQLDB IN子句中使用的列表

我知道如何将列表映射到字符串&#xff1a;foostring ",".join( map(str, list_of_ids) )而且我知道我可以使用以下命令将该字符串放入IN子句中&#xff1a;cursor.execute("DELETE FROM foo.bar WHERE baz IN (%s)" % (foostring))我需要使用MySQLDB安全地…

EasyTimer

转载于:https://www.cnblogs.com/Microshaoft/archive/2012/06/16/2552278.html

Dubbo的优化 --- 开发时使用

开发时的三个优化&#xff1a; 1、开发者在本地开发的时候启动Dubbo比较麻烦&#xff0c;所以采用直接连接的配置&#xff1b; 2、开发者本地开发时会打断点调试&#xff0c;会超过Dubbo默认的超时时间1s&#xff0c;所以需要全局设置超时时间&#xff1b; 3、开发者本地时可能…

MYSQL复制的几种模式

MYSQL复制的几种模式 MySQL 5.1 中&#xff0c;在复制方面的改进就是引进了新的复制技术&#xff1a;基于行的复制。MYSQL复制的几种模式 MySQL 5.1 中&#xff0c;在复制方面的改进就是引进了新的复制技术&#xff1a;基于行的复制。 简言之&#xff0c;这种新技术就是关注表中…

Code:目录

ylbtech-Code&#xff1a;目录1.返回顶部 1、https://github.com/2.返回顶部1、https://gitee.com2、3.返回顶部4.返回顶部5.返回顶部 6.返回顶部作者&#xff1a;ylbtech出处&#xff1a;http://ylbtech.cnblogs.com/本文版权归作者和博客园共有&#xff0c;欢迎转载&#xff…

微软系统修复工具(试用版)

Microsoft Fix it Center 使获得支持从未有过如此简单&#xff0c;因为自动疑难解 答程序将解决您的现有问题并防患于未然。 为您的设备定制了解决方案&#xff0c;仅显示与您的硬件 和软件相关的信息。 Microsoft Fix it Center 使您能轻松控制并支持您所有的设备。 下载地址…

java 会话共享_java – servlet如何工作?实例化,会话,共享变量和多线程

假设,我有一个拥有大量servlet的Web服务器.对于在这些servlet之间传递的信息,我正在设置会话和实例变量.现在,如果有2个或更多用户向此服务器发送请求,那么会话变量会发生什么&#xff1f;它们对所有用户都是通用的,或者对于每个用户而言都是不同的.如果它们不同,那么服务器如何…