canal rocketmq

上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的,可以先用单点将canal事件发送到mq中,再由mq并发处理,另外mq还可以做到削峰的作用,让canal数据不至于阻塞。

使用队列,可以自己起一个单实例服务使用ClusterCanalConnector将消息丢队列里,也可以直接使用canal server, canal server原生支持几种队列:Kafka, RocketMQ ,RabbitMQ, PulsarMQ, 下面了解一下canal sever具体的处理过程。

canal server将消息投递到mq中

在canal server中,如果检测到配置了mq, 就会启动线程来读取bin log事件,并投递到mq中:
CanalMQStarter

while (running && destinationRunning.get()) {Message message;if (getTimeout != null && getTimeout > 0) {message = canalServer.getWithoutAck(clientIdentity,getBatchSize,getTimeout.longValue(),TimeUnit.MILLISECONDS);} else {message = canalServer.getWithoutAck(clientIdentity, getBatchSize);}final long batchId = message.getId();int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();if (batchId != -1 && size != 0) {canalMQProducer.send(canalDestination, message, new Callback() {@Overridepublic void commit() {canalServer.ack(clientIdentity, batchId); // 提交确认}@Overridepublic void rollback() {canalServer.rollback(clientIdentity, batchId);}}); // 发送message到topic} else {try {Thread.sleep(100);} catch (InterruptedException e) {// ignore}}}

从代码可以看到,首先调用getWithoutAck从实例获取事件,然后调用canalMQProducer.send将消息投递到队列中,如果投递成功就执行ack,否则执行rollback, 因为投递消息到队列是非常快的操作,所以这就降低了阻塞的风险。

最终发送mq消息的代码如下(CanalRocketMQProducer):

    private void sendMessage(Message message, int partition) {//...SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {if (partition >= mqs.size()) {return mqs.get(partition % mqs.size());} else {return mqs.get(partition);}}, null);//...}

这里有个分区的概念,对于RocketMQ来说就是队列选择,这关系到顺序消费。

业务代码使用RocketMQCanalConnector消费数据

    while (running) {try {connector.connect();connector.subscribe();while (running) {List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取messagefor (Message message : messages) {long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {// try {// Thread.sleep(1000);// } catch (InterruptedException e) {// }} else {printSummary(message, batchId, size);printEntry(message.getEntries());// logger.info(message.toString());}}connector.ack(); // 提交确认}} catch (Exception e) {logger.error(e.getMessage(), e);}}connector.unsubscribe();// connector.stopRunning();
}

可以看到这和之前ClusterCanalConnector一样的处理方法,只是底层实现不一样,在subscribe的时候,调用了mq的subscribe:

    public synchronized void subscribe(String filter) throws CanalClientException {//...rocketMQConsumer.subscribe(this.topic, "*");rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {context.setAutoCommit(true);boolean isSuccess = process(messageExts);if (isSuccess) {return ConsumeOrderlyStatus.SUCCESS;} else {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});rocketMQConsumer.start();//...}

可以看到这里使用了MessageListenerOrderly来进行顺序消费, 使用process来处理消息

private boolean process(List<MessageExt> messageExts) {//...for (MessageExt messageExt : messageExts) {//...if (!flatMessage) {Message message = CanalMessageDeserializer.deserializer(data);messageList.add(message);} else {FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);messageList.add(flatMessage);}ConsumerBatchMessage batchMessage;if (!flatMessage) {batchMessage = new ConsumerBatchMessage<Message>(messageList);} else {batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);}try {messageBlockingQueue.put(batchMessage);} catch (InterruptedException e) {logger.error("Put message to queue error", e);throw new RuntimeException(e);}boolean isCompleted;try {isCompleted = batchMessage.waitFinish(batchProcessTimeout);} catch (InterruptedException e) {logger.error("Interrupted when waiting messages to be finished.", e);throw new RuntimeException(e);}boolean isSuccess = batchMessage.isSuccess();return isCompleted && isSuccess;}

这里将数据放到了messageBlockingQueue中,然后等待消息执行完成, ConsumerBatchMessage内置了一个CountDownLatch, batchMessage.waitFinish会阻塞在这里。
客户端使用getFlatList/getFlatListWithoutAck取数据时,就是从messageBlockingQueue取出数据,调用ack时,会释放ConsumerBatchMessage中的CountDownLatch, 这样mq消费者就可以继续从队列中拿数据了。

    @Overridepublic List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {if (this.lastGetBatchMessage != null) {throw new CanalClientException("mq get/ack not support concurrent & async ack");}ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);//...}@Overridepublic void ack() throws CanalClientException {if (this.lastGetBatchMessage != null) {this.lastGetBatchMessage.ack();}//...}

对于MessageListenerOrderly来说,是一个消费线程对应一个mq队列的,从而实现多线程消费,而这里把不同mq队列的消息在messageBlockingQueue中排队,并且使用getListWithoutAck/ack也不支持并发,又变成了单线程模式,这可能对性能造成影响,建议生产环境对性能有要求时,采用自己写代码来实现mq的消费。

配置

mq相关参数说明

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/107180.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI电销机器人好不好用关键是什么?

影响AI电销机器人是否好用的两个因素分别是&#xff0c;识别系统以及线路。 有很多电销企业都想找一个好用的AI电销机器人&#xff0c;可是什么样的机器人才是好用的机器人呢?有哪些因素会影响AI电销机器人好不好用呢? 添加图片注释&#xff0c;不超过 140 字&#xff08;可选…

groupnorm_backward反向公式推导

前向 均值 μ n g ∑ i 1 M ( X i ) M (1) {\large \mathit{\color{Blue} \mu_{ng} \frac{\sum_{i1}^M(X^{i})}{M}} } \tag{1} μng​M∑i1M​(Xi)​(1) 方差 σ n g 2 ∑ i 1 M ( X i − μ n g ) M (2) {\large \mathit{\color{Blue} \sigma_{ng}^2 \frac{\sum_{i …

在 Linux 上保护 SSH 服务器连接的 8 种方法

SSH 是一种广泛使用的协议&#xff0c;用于安全地访问 Linux 服务器。大多数用户使用默认设置的 SSH 连接来连接到远程服务器。但是&#xff0c;不安全的默认配置也会带来各种安全风险。 具有开放 SSH 访问权限的服务器的 root 帐户可能存在风险。尤其是如果使用的是公共 IP 地…

qt中json类

目录 QJsonValue QJsonObject QJsonArray QJsonDocument 案例&#xff1a; Qt 5.0开始提供了对Json的支持&#xff0c;我们可以直接使用Qt提供的Json类进行数据的组织和解析&#xff0c;下面介绍4个常用的类。 QJsonValue 该类封装了JSON支持的数据类型。 布尔类型&#xf…

【Power BI】Power BI 入门指南:版本、下载和报表创建的步骤

文章目录 一、前言二、了解 Power BI 版本三、下载 Power BI Desktop四、如何开始使用 Power BI Desktop五、在 Power BI Desktop 中创建报表六、文末总结 一、前言 Power BI 是微软于 2013 年推出的产品&#xff0c;为一款商业智能与数据可视化工具。它通过引人注目的视觉效果…

[Linux 基础] Linux编辑器Vim,你值得拥有

文章目录 1、Linux 软件包管理器 yum1.1 什么是软件包1.2 如何安装软件1.3 如何卸载软件 2、vim的使用2.1 vim的安装和配置2.2 vim的基本概念2.3 vim的基本操作 3、vim正常模式命令集4、vim注释与去注释5、Liunx编辑器-gcc/g使用5.1 如何使用gcc编译c程序5.2 gcc的翻译过程5.2.…

Python-pyecharts和pandas库

目录 pyecharts库 pandas库 示例1 示例2 pyecharts库 pyecharts是一个基于Python的交互式数据可视化库&#xff0c;旨在帮助用户轻松地创建各种类型的图表和可视化效果。该库是在Echarts开源项目的基础上开发的&#xff0c;Echarts是一款由百度开发的优秀的数据可视化工具。…

为什么机器学习中需要假设检验

最近由于研究需要&#xff0c;需要在机器学习项目的结果中加入假设检验的内容&#xff0c;但是机器学习中的假设检验和数理统计中的假设检验不同&#xff0c;是数理统计中假设检验的延申。但是&#xff0c;本来假设检验就是数理统计中的比较绕的一部分&#xff0c;比较难懂&…

泛在电力物联网的关键技术与未来发展策略-安科瑞黄安南

摘要: 文章分析了泛在电力物联网的内涵及其主要特征&#xff0c;针对泛在电力物联网的建设目标、基本构架以及关键技术与未来发展策略进行综合探讨&#xff0c;期待得到专业人士的指点。 关键词: 泛在电力物联网&#xff0c; 网络规划&#xff0c; 网络发展 随着能源革命的不…

MAC版idea如何安装maven?

什么是maven项目 Maven 是 Apache 组织下的一个跨平台的项目管理工具,它主要用来帮助实现项目的构建、测试、打包和部署。它的跨平台性保证了在不同的操作系统上可以使用相同的命令来完成相应的任务。 为什么选择Maven项目,而非普通的Java项目。普通的Java项目如果依赖其他…

kafka消费者程序日志报错Offset commit failed问题研究

生产环境偶尔会遇到kafka消费者程序日志报错的问题 截取主要日志如下&#xff1a; 2023-10-02 19:35:28.554 {trace: d7f97f70dd693e3d} ERROR[Thread-49:137] ConsumerCoordinator$OffsetCommitResponseHandler.handle(812) - [Consumer clientIdconsumer-1, groupIdcid_yin…

计算机网络基础(三):IPv4编址方式、子网划分、IPv4通信的建立与验证及ICMP协议

**IPv4地址是一个32位长的二进制数。**而这个32位二进制数又通常会表示为4个用点隔开的十进制数。那么&#xff0c;这个32位二进制数要如何通过4个十进制数表示出来呢&#xff1f; 我们在配置IPv4地址时&#xff0c;同时配置的“掩码”又有何用途&#xff1f; 1.IPv4编址方式…

第 367 场 LeetCode 周赛题解

A 找出满足差值条件的下标 I 模拟 class Solution { public:vector<int> findIndices(vector<int> &nums, int indexDifference, int valueDifference) {int n nums.size();for (int i 0; i < n; i)for (int j 0; j < i; j)if (i - j > indexDiffe…

软件测试的调用接口怎么调用,逻辑是什么?

一、什么是接口测试&#xff1f; 接口测试是测试系统组件之间接口的测试。接口主要用于检测外部系统和内部子系统之间的交互点。测试的重点是检查数据交换、传输、控制和管理过程&#xff0c;以及系统之间的相互逻辑依赖。 二、为什么要做接口测试&#xff1f; 在淘宝系统的历…

Go编程:使用 Colly 库下载Reddit网站的图像

概述 Reddit是一个社交新闻网站&#xff0c;用户可以发布各种主题的内容&#xff0c;包括图片。本文将介绍如何使用Go语言和Colly库编写一个简单的爬虫程序&#xff0c;从Reddit网站上下载指定主题的图片&#xff0c;并保存到本地文件夹中。为了避免被目标网站反爬&#xff0c…

C++入门篇(3)---引用

1.引用 你有没有被人起过外号?比如身边的朋友,喊他的时候不会叫他的全名,像我很好的朋友,我一般都喜欢叫他"阿威",而不会去称呼全名.我叫他"阿威",他还是他没有什么问题. 这里新登场的引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&am…

多线程使用处理数据库导致锁表解决办法

问题描述&#xff1a; 当使用ON DUPLICATE KEY UPDATE的sql来访问时&#xff0c; 可能会出现多个线程同时写入一个已有的数据里。 解决办法&#xff1a; 使用 REPLACE INTO 原因&#xff1a; 保持更好的并发性&#xff1a;REPLACE INTO 在插入记录时会先删除原有记录&#xf…

数据结构---二叉树

树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。 树形结构中&#xff0c;子树之间不能有交集&#xff0c;…

Ubuntu22常用软件

别存太多重要东西在Ubuntu &#xff0c;硬盘损坏就麻烦 Tweaks自定义UI sudo apt intall gnome-tweaks为了方便管理和添加&#xff0c;还需添加&#xff1a; sudo apt install gnome-shell-extension-prefs gnome-shell-extension-manager -y1.打开Extension应用&#xff0c;添…

1.Vue-在独立页面实现Vue的增删改查

题记 在独立页面实现Vue的增删改查&#xff0c;以下是具体的代码&#xff0c;和操作流程。 编写index.html页面 index.html文件如下&#xff1a; <!DOCTYPE html> <html> <head><title>Vue CRUD Example</title><!--在线导入vue文件-->&l…