canal+mq将数据同步到redis中的一些类型转换问题

在将 Canal 捕获到的数据库变更同步到 RabbitMQ 时,通常需要将变更事件的数据从 Java 对象转换为一种通用的数据格式,如 JSON。这样可以确保数据在不同系统之间传递时的兼容性。以下是将 Canal 数据同步到 RabbitMQ 并进行数据类型转换的示例代码。

1. Canal 客户端从 MySQL 读取变更

首先,编写 Canal 客户端以从 MySQL 读取数据库变更,并将变更数据转换为 JSON 格式,然后发送到 RabbitMQ。

2. 依赖库

在项目中添加了所需的依赖库,如 Canal 客户端、RabbitMQ 客户端和 JSON 处理库(例如 Jackson)。

3. Canal 客户端发送数据到 RabbitMQ

编写 Canal 客户端,读取 MySQL 数据库变更并将其发送到 RabbitMQ:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;public class CanalToRabbitMQ {private final static String QUEUE_NAME = "db_changes";private final static ObjectMapper objectMapper = new ObjectMapper();public static void main(String[] args) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);connector.connect();connector.subscribe("mydatabase.mytable");connector.rollback();while (true) {Message message = connector.getWithoutAck(100); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId != -1 && size > 0) {for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {Map<String, Object> dataMap = new HashMap<>();if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {dataMap.put("eventType", "INSERT");for (CanalEntry.Column column : rowData.getAfterColumnsList()) {dataMap.put(column.getName(), column.getValue());}} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {dataMap.put("eventType", "UPDATE");for (CanalEntry.Column column : rowData.getAfterColumnsList()) {dataMap.put(column.getName(), column.getValue());}} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {dataMap.put("eventType", "DELETE");for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {dataMap.put(column.getName(), column.getValue());}}// 将数据转换为 JSON 字符串String jsonMessage = objectMapper.writeValueAsString(dataMap);// 将 JSON 字符串发送到 RabbitMQchannel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes("UTF-8"));}}}}connector.ack(batchId); // 提交确认}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}
}

4. RabbitMQ 消费者同步到 Redis

编写 RabbitMQ 消费者,从 RabbitMQ 队列中消费消息并同步到 Redis:

 
import com.rabbitmq.client.*;
import redis.clients.jedis.Jedis;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class RabbitMQToRedis {private final static String QUEUE_NAME = "db_changes";private final static ObjectMapper objectMapper = new ObjectMapper();public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);Jedis jedis = new Jedis("127.0.0.1");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);try {// 将 JSON 字符串转换为 Map 对象Map<String, Object> dataMap = objectMapper.readValue(message, Map.class);// 处理数据并同步到 RedisString eventType = (String) dataMap.get("eventType");if ("INSERT".equals(eventType) || "UPDATE".equals(eventType)) {// 假设主键是 idString id = (String) dataMap.get("id");jedis.set(id, message);} else if ("DELETE".equals(eventType)) {String id = (String) dataMap.get("id");jedis.del(id);}System.out.println("Received and processed: " + message);} catch (Exception e) {e.printStackTrace();}};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});// 保持程序运行Runtime.getRuntime().addShutdownHook(new Thread(() -> {try {channel.close();connection.close();jedis.close();} catch (Exception e) {e.printStackTrace();}}));}
}

代码解析

  1. Canal 客户端

    • 连接到 Canal Server,订阅指定数据库和表的变更。
    • 从 binlog 中解析变更事件。
    • 将变更事件转换为 JSON 格式。
    • 将 JSON 消息发送到 RabbitMQ。
  2. RabbitMQ 消费者

    • 连接到 RabbitMQ 队列,消费消息。
    • 将 JSON 消息解析为 Map 对象。
    • 根据事件类型(INSERT、UPDATE、DELETE)同步数据到 Redis。

通过这种方式,Canal 可以将数据库变更事件捕获并转换为 JSON 格式,推送到 RabbitMQ,然后由 RabbitMQ 消费者同步到 Redis。这种方法确保了数据在不同系统之间传递时的兼容性和灵活性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/865352.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cherno 游戏引擎笔记记录(33~45)

好久不见。 My Github REPO(GitHub - JJJJJJJustin/Nut: The game_engine which learned from Cherno) 源码笔记&#xff0c;希望帮到你 :-} -------------------相机&原理---------- 》》》》查看这两篇说明&#xff0c;一个是坐标系统&#xff0c;一个是摄像机 &#xf…

通过Spring Boot结合实时流媒体技术对考试过程进行实时监控

本章将深入探讨考试系统中常见的复杂技术问题&#xff0c;并提供基于Spring Boot 3.x的解决方案。涵盖屏幕切换检测与防护、接打电话识别处理、行为监控摄像头使用、网络不稳定应对等&#xff0c;每篇文章详细剖析问题并提供实际案例与代码示例&#xff0c;帮助开发者应对挑战&…

大语言模型系列-Transformer(二)

Transformer 模型的入门可以从以下几个方面开始&#xff1a; 1. 理解基本概念 序列到序列&#xff08;Sequence-to-Sequence&#xff09;任务&#xff1a;Transformer 模型主要用于这类任务&#xff0c;如机器翻译、文本摘要等。注意力机制&#xff08;Attention Mechanism&a…

PyTorch基础(23)-- Tensor.scatter_()方法

一、前言 本次要介绍的函数为Tensor.scatter_函数&#xff0c;也是PyTorch中常用的函数之一&#xff0c;但遗憾的是&#xff0c;我想在网络上查询该函数的用法时&#xff0c;大部分的文章都是直接给出一个示例&#xff0c;看完之后&#xff0c;其中的原理我还是无法理解&#…

python生成器在读取接口用例中应用解析

Python生成器Generator Python生成器&#xff08;Generator&#xff09;是一种特殊类型的函数&#xff0c;它可以通过yield语句逐步生成值。 生成器提供了一种延迟计算的方式&#xff0c;可以逐步产生结果&#xff0c;而不是一次性生成所有的值。 1、生成器原理&#xff1a; …

Java中的AOP编程详解

Java中的AOP编程详解 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 1. 什么是AOP&#xff1f; AOP&#xff08;Aspect-Oriented Programming&#xff0c;面…

2024年【A特种设备相关管理(A4电梯)】试题及解析及A特种设备相关管理(A4电梯)模拟试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 A特种设备相关管理&#xff08;A4电梯&#xff09;试题及解析根据新A特种设备相关管理&#xff08;A4电梯&#xff09;考试大纲要求&#xff0c;安全生产模拟考试一点通将A特种设备相关管理&#xff08;A4电梯&#x…

Mac密室逃脱游戏推荐:Escape Simulator for mac安装包

Escape Simulator 是一款逃生模拟游戏&#xff0c;玩家在游戏中需要寻找线索、解决谜题&#xff0c;以逃离各种房间或环境。这种类型的游戏通常设计有多个关卡或场景&#xff0c;每个场景都有不同的设计和难度。 在 Escape Simulator 中&#xff0c;玩家的目标通常是找到出口或…

算法力扣刷题——总结篇【四】和string类详解

前言 字符串章节部分跟随学习结束&#xff0c;作出总结。 一、题目及方法总结 &#xff08;1&#xff09;反转字符串&#xff1a;双指针法。 反转全部字符串&#xff0c;i在开头&#xff0c;j在结尾&#xff1b;判断条件i < j ;每隔2k反转前k个字符&#xff0c;i改成i 2…

PermissionError: [Errno 13] Permission denied: ‘/tmp/gradio...‘

无管理员权限修改 Gradio 默认路径遇到的 PermissionError 问题 在使用 Gradio 进行开发和部署时&#xff0c;可能会遇到如下报错&#xff1a; PermissionError: [Errno 13] Permission denied: /tmp/gradio/tmpzo5r9g_k.png报错分析 上述报错是由于在没有权限访问指定路径时…

东方韵味:红酒与茶道的很好邂逅

在古老的东方&#xff0c;茶道与红酒各自承载着深厚的文化底蕴和历史传承。当这两大传统文化碰撞、交融&#xff0c;仿佛展开了一幅绚烂多姿的画卷&#xff0c;既展现了东方的神秘韵味&#xff0c;又融入了红酒的异国风情。今天&#xff0c;就让我们一同探索这场红酒与茶道的很…

详解微服务应用灰度发布最佳实践

作者&#xff1a;子丑 本次分享是站在 DevOps 视角的灰度发布实践概述&#xff0c;主要内容包括以下四个方面&#xff1a; 第一&#xff0c;灰度发布要解决的问题&#xff1b; 第二&#xff0c;灰度发布的四种典型场景&#xff1b; 第三&#xff0c;如何把灰度发布融入到应…

关于std::memory_order_consume

原文&#xff1a;https://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/ 翻译&#xff1a;C11中memory_order_consume的目的 https://blog.csdn.net/netyeaxi/article/details/80718781 文章中有这样一个例子&#xff1a; g Guard.load(memory_ord…

terminals database is inaccessible

在复制虚拟环境后&#xff0c;执行clean操作经常报以上错误 解决方案如下&#xff1a; vim ~/.bashrc alias clear"TERMxterm /usr/bin/clear" source ~/.bashrc

2024-07-03_外语学习

文章目录 1. SSCC&#xff08;Serial Shipping Container Code&#xff09;2. Serial读音词源ser-01ser-02 3. routing instruction在计算领域在物流领域Routing Instruction 的词源分析RoutingInstruction 1. SSCC&#xff08;Serial Shipping Container Code&#xff09; 在…

2024年07月03日 Redis部署方式和持久化

Redis持久化方式&#xff1a;RDB和AOF&#xff0c;和混合式 RDB&#xff1a;周期备份模式&#xff0c;每隔一段时间备份一份快照文件&#xff0c;从主线程Fork一个备份线程出来备份&#xff0c;缺点是会造成数据的丢失。 AOF&#xff1a;日志模式&#xff0c;每条命令都以操作…

Java8环境安装(jdk1.8安装)详细教程

Java 8环境安装&#xff08;jdk1.8安装&#xff09;详细教程 Java 8&#xff08;也称为JDK 1.8&#xff09;&#xff0c;是Oracle公司于2014年3月发布的一个重要的Java语言版本。这个版本自发布以来&#xff0c;因其众多的新特性和改进&#xff0c;被认为是Java语言发展历程中…

itk::ShapedNeighborhoodIterator类C2516问题

错误问题&#xff1a; 1>C:\itk\src-5.3.0\Modules\Core\Common\include\itkShapedNeighborhoodIterator.h(183,1): error C2516: itk::ShapedNeighborhoodIterator<TImage,TBoundaryCondition>::ConstIterator: is not a legal base class 1>C:\itk\src-5.3.0\Mo…

0基础学C++ | 第11天 | 基础知识 | 引用

目录 引用的基本使用 引用的注意事项 引用做函数参数 引用做函数的返回值 引用的本质 引用的基本使用 作用&#xff1a;引用就是给变量起别名&#xff0c;它不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空间&#…

MySql中每行多值属性的计数值

MySql中每行多值属性的计数值 在MySQL中&#xff0c;每行多值属性的计数值指的是在一个表中的某个字段中存储了多个值&#xff0c;而不是单个值。这种情况下&#xff0c;我们需要计算每行中多值属性的数量。 在MySQL中&#xff0c;可以使用内置的函数来计算每行多值属性的计数…