【Canal源码分析】Sink及Store工作过程

一、序列图

image

二、源码分析

2.1 Sink

Sink阶段所做的事情,就是根据一定的规则,对binlog数据进行一定的过滤。我们之前跟踪过parser过程的代码,发现在parser完成后,会把数据放到一个环形队列TransactionBuffer中,也就是这个方法:

transactionBuffer.add(entry);

我们具体看下add这个方法。

public void add(CanalEntry.Entry entry) throws InterruptedException {switch (entry.getEntryType()) {case TRANSACTIONBEGIN:flush();// 刷新上一次的数据put(entry);break;case TRANSACTIONEND:put(entry);flush();break;case ROWDATA:put(entry);// 针对非DML的数据,直接输出,不进行buffer控制EventType eventType = entry.getHeader().getEventType();if (eventType != null && !isDml(eventType)) {flush();}break;default:break;}
}

判断一下事件的类型,如果是事务开头,那么直接刷新之前的数据,然后把当前事件加到队列中;如果是事务的结束,那么先把当前事务放到队列后,刷新到下一个阶段;如果是普通的事件,直接放到队列中,如果事务头类型不为空,且不是DML类型,那么直接刷新队列中数据到下一个阶段。

我们需要理清楚这块的逻辑,什么时候flush,什么时候put,针对不同的事件,采取的策略不一样。

这里我们分析下flush和put两个步骤。

2.1.1 flush队列

这块其实还没有涉及到sink阶段,还在维护一个事件环形队列。这个环形队列,维护了两个指针,一个是flush的指针,一个是put的指针,flush的指针永远是滞后于put指针的。

private void flush() throws InterruptedException {long start = this.flushSequence.get() + 1;long end = this.putSequence.get();if (start <= end) {List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();for (long next = start; next <= end; next++) {transaction.add(this.entries[getIndex(next)]);}flushCallback.flush(transaction);flushSequence.set(end);// flush成功后,更新flush位置}
}

start就是flush的指针,end就是put的指针,flush的动作就是把当前flush到put中间的数据,全部刷新到下一个阶段。具体传递到下一个阶段的代码在flushCallback.flush方法中。这块我们下文再分析。

2.1.2 put

private void put(CanalEntry.Entry data) throws InterruptedException {// 首先检查是否有空位if (checkFreeSlotAt(putSequence.get() + 1)) {long current = putSequence.get();long next = current + 1;// 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值entries[getIndex(next)] = data;putSequence.set(next);} else {flush();// buffer区满了,刷新一下put(data);// 继续加一下新数据}
}

这块的注释都比较清晰了,就不赘述了。

2.1.3 flush到sink

具体的代码在AbstractEventParser中,定义transactionBuffer的地方。

public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);if (!running) {return;}if (!successed) {throw new CanalParseException("consume failed!");}LogPosition position = buildLastTransactionPosition(transaction);if (position != null) { // 可能position为空logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);}
}

主要的处理在consumeTheEventAndProfilingIfNecessary里面。这里面调用了eventSink.sink()方法。

2.1.4 sink

这里面进行了binlog数据的过滤。首先判断是否需要过滤事务头和尾,如果需要过滤的话,直接过滤掉,默认不过滤。

遍历传到这个阶段的binlog列表,根据正则表达式判断,是否需要进行过滤,一般来说是根据表名、库名等进行过滤。这边的过滤类主要是AviaterRegexFilter,根据库名.表名和表达式进行过滤。如果需要进行过滤,那么直接把这个事件过滤。否则,加到binlog列表中,进行二次过滤。第二次过滤的主要内容是HEARTBEAT类型的事件,主要的代码在这里:

protected boolean doSink(List<Event> events) {for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.before(events);//处理heartbeat事件}int fullTimes = 0;do {if (eventStore.tryPut(events)) {for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.after(events);}return true;} else {applyWait(++fullTimes);}for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {events = handler.retry(events);}} while (running && !Thread.interrupted());return false;
}

这里的CanalEventDownStreamHandler其实只有HeartBeatEntryEventHandler,也就是在before方法中把heartbeat事件从events去掉。这个心跳事件其实是parser过程生成的,我们之前有提到过。after目前是空的方法。

去掉之后,剩余的事件列表就会被调用tryPut()方法,送到下一步骤store中。

这里还有个applyWait方法,防止无限等待。

private void applyWait(int fullTimes) {int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;if (fullTimes <= 3) { // 3次以内Thread.yield();} else { // 超过3次,最多只sleep 10msLockSupport.parkNanos(1000 * 1000L * newFullTimes);}}

2.2 Store

目前只有基于内存模式的Store,这个阶段是真正Server中的落盘阶段。数据经历了mysql master到parser,再到sink,最后终于到了这里。

public boolean tryPut(List<Event> data) throws CanalStoreException {if (data == null || data.isEmpty()) {return true;}final ReentrantLock lock = this.lock;lock.lock();try {if (!checkFreeSlotAt(putSequence.get() + data.size())) {return false;} else {doPut(data);return true;}} finally {lock.unlock();}
}

在进行数据put的时候,加了一把锁。首先计算下是否还有剩余的空间进行数据处理,这里的计算,不光是计算了队列的剩余长度,还计算了剩余空间。队列的长度默认是16*1024,如果空间不足,直接拒绝,返回false,等待空间空余出来后,再进行put操作。否则,直接doPut()。

/*** 执行具体的put操作*/
private void doPut(List<Event> data) {long current = putSequence.get();long end = current + data.size();// 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值for (long next = current + 1; next <= end; next++) {entries[getIndex(next)] = data.get((int) (next - current - 1));}putSequence.set(end);// 记录一下gets memsize信息,方便快速检索if (batchMode.isMemSize()) {long size = 0;for (Event event : data) {size += calculateSize(event);}putMemSize.getAndAdd(size);}// tell other threads that store is not emptynotEmpty.signal();
}

这里主要对put一些指针,还有空间做了重新的计算。放到队列中之后,通知其他等待notEmpty的线程,来进行数据的获取,这时候,client可以进行数据获取了。

转载于:https://www.cnblogs.com/f-zhao/p/9088655.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/267028.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

博客园修改页面显示样式css

博客园修改页面显示样式css 一、总结 二、 博客园修改页面显示样式css 1、点管理 2、点设置 3、在页面定制css代码中加入你想要改变样式的css代码即可 不会写css的下面有代码示例&#xff0c;直接复制粘贴放到页面定制css代码位置即可 三、代码 1 #cnblogs_post_body h3 {2 …

基准测试 ApacheBench ab学习

2019独角兽企业重金招聘Python工程师标准>>> ab的全称是ApacheBench&#xff0c;是 Apache 附带的一个小工具&#xff0c;专门用于 HTTP Server 的benchmark testing&#xff0c;可以同时模拟多个并发请求。前段时间看到公司的开发人员也在用它作一些测试&#xff0…

java虚拟机类加载机制浅谈_浅谈Java虚拟机(三)之类加载机制

在《浅谈Java虚拟机》这篇文章中&#xff0c;我们提到了JVM从操作系统方面来说&#xff0c;可以将其看做是一个进程&#xff0c;分别有类加载器子系统&#xff0c;执行引擎子系统和垃圾收集子系统。这一篇文章就简单的来谈一下类加载器子系统中的类加载机制。第一&#xff1a;什…

fedora 16 面部显示

为什么80%的码农都做不了架构师&#xff1f;>>> 显示日期&#xff1a; gsettings set org.gnome.shell.clock show-date true 显示秒数&#xff1a; gsettings set org.gnome.shell.clock show-seconds true 显示天气&#xff1a; 1.在这里 https://github.com/sim…

c#入门笔记

c#入门初探 零. 写在前面 0.1 解决方案、项目、程序集、命名空间 0.1.1项目 一个项目可以就是你开发的一个软件。在.Net下&#xff0c;一个项目可以表现为多种类型&#xff0c;如控制台应用程序&#xff0c;Windows应用程序&#xff0c;类库&#xff08;Class Library&#xff…

博客作业05--查找

1.学习总结 1.1查找的思维导图 1.2 查找学习体会 1、map简介 map是一类关联式容器。它的特点是增加和删除节点对迭代器的影响很小&#xff0c;除了那个操作节点&#xff0c;对其他的节点都没有什么影响。 对于迭代器来说&#xff0c;可以修改实值&#xff0c;而不能修改key。2、…

Dubbo的优化 --- 开发时使用

开发时的三个优化&#xff1a; 1、开发者在本地开发的时候启动Dubbo比较麻烦&#xff0c;所以采用直接连接的配置&#xff1b; 2、开发者本地开发时会打断点调试&#xff0c;会超过Dubbo默认的超时时间1s&#xff0c;所以需要全局设置超时时间&#xff1b; 3、开发者本地时可能…

Code:目录

ylbtech-Code&#xff1a;目录1.返回顶部 1、https://github.com/2.返回顶部1、https://gitee.com2、3.返回顶部4.返回顶部5.返回顶部 6.返回顶部作者&#xff1a;ylbtech出处&#xff1a;http://ylbtech.cnblogs.com/本文版权归作者和博客园共有&#xff0c;欢迎转载&#xff…

Redis主从实战

为了提升redis高可用性&#xff0c;除了备份redis dump数据之外&#xff0c;还需要创建redis主从架构&#xff0c;可以利用从将数据库持久化&#xff0c;&#xff08;我们所说的数据持久化将是将数据保存到写磁盘上&#xff0c;保证不会因为断电等因素丢失数据&#xff09; Red…

Python Day 21 面向对象 (面向对象的三大特性(二)继承,多态,封装,几个装饰器函数)...

Python Day 21 面向对象 (面向对象的三大特性&#xff08;二&#xff09;继承&#xff0c;多态&#xff0c;封装&#xff0c;几个装饰器函数) https://mubu.com/doc/1AqL_M0IbW 继承之钻石继承 多态 封装 几个装饰器函数 classmethod 可以通过类使用被装饰的方法staticmethod …

php webservice 上传,PHP实现WebService服务

第一步&#xff0c;安装PHP扩展SOAP并开启扩展&#xff0c;是否开启成功以phpinfo为准。第二步&#xff0c;创建服务端文件server.php{public functionhello(){return "Hello World!";}public function sum($num1,$num2){return $num1$num2;}}//创建 SoapServer 对象…

配置springboot在访问404时自定义返回结果以及统一异常处理

在搭建项目框架的时候用的是springboot&#xff0c;想统一处理异常&#xff0c;但是发现404的错误总是捕捉不到&#xff0c;总是返回的是springBoot自带的错误结果信息。 如下是springBoot自带的错误结果信息&#xff1a; 1 { 2 "timestamp": 1492063521109, 3 &…

nginx配置php 9000,Nginx支持php配置

Nginx本身是不支持对外部程序的直接调用或者解析&#xff0c;所有的外部程序(包括PHP)必须通过FastCGI接口来调用。FastCGI接口在Linux 下是socket&#xff0c;(这个socket可以是文件socket&#xff0c;也可以是ip socket)。为了调用CGI程序&#xff0c;还需要一个FastCGI的wra…

ansible 判断和循环

标准循环 模式一 - name: add several usersuser: name{{ item }} statepresent groupswheelwith_items:- testuser1- testuser2 orwith_items: "{{ somelist }}" 模式2. 字典循环- name: add several usersuser: name{{ item.name }} statepresent groups{{ item.g…

php require 500,thinkphp5出现500错误怎么办

thinkphp5出现500错误&#xff0c;如下图所示&#xff1a;require(): open_basedir restriction in effect. File(/home/wwwroot/pic/thinkphp/start.php) is not within the allowed解决方法&#xff1a;1、我是lnmp1.4 php5.6&#xff0c;php.ini里面的open_basedir 是注释掉…

如何创建路径别名

在访问页面时&#xff0c;页面地址会以 DocumentRoot所指定的路径为相对路径&#xff0c;但若不想使用指定的路径&#xff0c;则需要创建路径别名。假如DocumentRoot为/var/www/html &#xff0c;现想将/var/www/html/mail 建立别名/web/mail&#xff0c;该如何修改呢&#xff…

33 -jQuery 属性操作,文档操作(未完成)

转载于:https://www.cnblogs.com/venicid/p/9110130.html

Robot Framework + Selenium library + IEDriver环境搭建

转载&#xff1a;https://www.cnblogs.com/Ming8006/p/4998492.html#c.d 目录&#xff1a; 1 安装文件准备2 Robot框架结构3 环境搭建 3.1 安装Python 3.2 安装Robot Framework 3.3 安装wxPython 3.4 安装RIDE 3.5 安装Selenium2Library 3.6 安装IEDriverServer 1 安装文…

php静态地图api,静态图API | 百度地图API SDK

百度地图静态图API&#xff0c;可实现将百度地图以图片形式嵌入到您的网页中。您只需发送HTTP请求访问百度地图静态图服务&#xff0c;便可在网页上以图片形式显示您的地图。静态图API较之JavaScript API载入的动态网站&#xff0c;既能满足基本的地图信息浏览&#xff0c;又能…

[XMOVE自主设计的体感方案] XMove Studio管理系统(二)应用开发API简要介绍

一. XMove的开放式应用开发框架简介 XMove4.0以开放式的结构满足扩展性的要求。所有无线协议&#xff0c;底层算法和控制逻辑全部上移到PC端。节点只根据接受的控制逻辑返回传感器数据。新的架构使得开发新应用非常方便。 本节将主要介绍XMove应用开发API及其使用。 二. 注册新…