Canal+Kafka实现MySQL与Redis数据同步(二)

Canal+Kafka实现MySQL与Redis数据同步(二)

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group1#序列化反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringDeserializervalue-serializer: org.apache.kafka.common.serialization.StringDeserializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

public class CanalBean {//数据private List<TbCommodityInfo> data;//数据库名称private String database;private long es;//递增,从1开始private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private String old;//主键名称private List<String> pkNames;//sql语句private String sql;private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法
}
public class MysqlType {private String id;private String commodity_name;private String commodity_price;private String number;private String description;//getter、setter方法
}
public class SqlType {private int id;private int commodity_name;private int commodity_price;private int number;private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Component
public class CanalConsumer {//日志记录private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);//redis操作工具类@Resourceprivate RedisClient redisClient;//监听的队列名称为:canaltopic@KafkaListener(topics = "canaltopic")public void receive(ConsumerRecord<?, ?> consumer) {String value = (String) consumer.value();log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为javaBeanCanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);//获取是否是DDL语句boolean isDdl = canalBean.getIsDdl();//获取类型String type = canalBean.getType();//不是DDL语句if (!isDdl) {List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();//过期时间long TIME_OUT = 600L;if ("INSERT".equals(type)) {//新增语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//新增到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else if ("UPDATE".equals(type)) {//更新语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//更新到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else {//删除语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//从redis中删除redisClient.deleteKey(id);}}}}
}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',`number` int(10) DEFAULT '0' COMMENT '商品数量',`description` varchar(2048) DEFAULT '' COMMENT '商品描述',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

img

Redis也查到了对应的数据,证明同步成功!

img

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。
    网的回答,大家参考一下
    img

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/151625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C现代方法(第21章)笔记——标准库

文章目录 第21章 标准库21.1 标准库的使用21.1.1 对标准库中所用名字的限制21.1.2 使用宏隐藏的函数 21.2 C89标准库概述21.3 C99标准库更新21.4 <stddef.h>&#xff1a;常用定义21.5 <stdbool.h>&#xff1a;布尔类型和值(C99)21.6 C11标准更新(C1X)21.7 <stda…

深入解析序列模型:全面阐释 RNN、LSTM 与 Seq2Seq 的秘密

探索序列建模的基础知识和应用。 简介 序列建模是许多领域的一个重要问题&#xff0c;包括自然语言处理 (NLP)、语音识别和语音合成、时间序列预测、音乐生成和「生物信息学」。所有这些任务的共同点是它们需要坚持。接下来的事情的预测是基于历史的。例如&#xff0c;在“哈桑…

Docker入门学习笔记

学习笔记网址推送&#xff1a;wDocker 10分钟快速入门_哔哩哔哩_bilibili docker是用来解决什么问题的&#xff1f; 例如当你在本地主机写了个web应用&#xff0c;而你打算将该应用发送给其他客户端进行案例测试和运行&#xff0c;若是传统做法&#xff0c;就比较复杂&#xf…

conda虚拟环境中安装的cuda和服务器上安装的cuda的异同

服务器上已安装Nvidia提供的cuda&#xff0c;nvcc -V时会出现已安装的CUDA版本。如下图所示&#xff0c;服务器上已安装好的cuda版本为10.1。 但是当我们在Anaconda虚拟环境下安装pytorch或者paddlepaddle等深度学习框架的GPU版本时&#xff0c;通常会选择较高版本的cuda&…

Git 简介及使用(1)

目录 一、在 Linux 环境中安装 Git 1. 先检查当前服务器中是否有 Git&#xff08;如果有显示如下图&#xff09; 2. 安装Git 3. 然后重复第一步&#xff1a;查看 Git 的版本信息即可 二、Git 的初始化及配置 1. 创建目录 2. 对仓库进行初始化 3. 新增两个配置项&#xff08…

理解Android线程基础与多线程编程模型

引言 在Android应用开发中&#xff0c;线程的合理使用对于保障应用的流畅性和性能至关重要。本文将带你深入了解Android线程的基础概念和多线程编程模型&#xff0c;同时将线程与进程进行比较&#xff0c;通过详细的解释和代码示例&#xff0c;使读者能够更全面地理解相关知识…

PyTorch神经网络-激励函数

在PyTorch 神经网络当中&#xff0c;使用激励函数处理非线性的问题&#xff0c;普通的神经网络出来的数据一般是线性的关系&#xff0c;但是遇到比较复杂的数据的话&#xff0c;需要激励函数处理一些比较难以处理的问题&#xff0c;非线性结果就是其中的情况之一。 FAQ:为什么要…

C# 32应用程序获取64位操作系统注册表

若C#的程序都是32位的&#xff0c;访问注册表的时候&#xff0c;会访问HKEY_LOCAL_MACHINE\SOFTWARE\Wow6432Node\&#xff0c; 而访问不到HKEY_LOCAL_MACHINE\SOFTWARE 适用版本&#xff1a;.NET 4.0及更高版本 public static Dictionary<string, string> GetInstalled…

【Java 进阶篇】Ajax 实现——JQuery 实现方式 `get` 与 `post`

嗨&#xff0c;亲爱的小白们&#xff01;欢迎来到这篇关于使用 jQuery 实现 Ajax 请求的博客。在前端开发中&#xff0c;Ajax 是一项非常重要的技术&#xff0c;它使我们能够在不刷新整个页面的情况下与服务器进行数据交互。而在 jQuery 中&#xff0c;get 和 post 方法提供了简…

vue之Error: Unknown option: .devServer.

背景 在使用内网穿透工具时&#xff0c;加入对应的配置&#xff0c;启动出现报错。 一、遇到的问题 报错&#xff1a; Error: Unknown option: .devServer. Check out https://babeljs.io/docs/en/babel-core/#options for more information about options. Error: Unknown …

全流量分析应用运行和访问情况

在当今数字化时代&#xff0c;应用程序的运行和访问情况对于企业和组织来说至关重要。无论是在线销售平台、移动应用还是企业内部系统&#xff0c;应用的性能和可用性直接影响着用户体验、业务流程以及组织效率。因此&#xff0c;对应用的运行和访问情况进行全面分析和评估&…

JZM-D30室温探针台技术参数

概况&#xff1a; JZM-D30室温探针台的诸多设计都是专用的&#xff0c;探针台的配置主要是根据用户的需求进行选配及设计。例如&#xff0c;要求的磁场型号&#xff0c;电源型号&#xff0c;磁场值&#xff0c;样品台的尺寸等&#xff0c;除此之外&#xff0c;该探针台和我司自…

Go 语言中的map和内存泄漏

map在内存中总是会增长&#xff1b;它不会收缩。因此&#xff0c;如果map导致了一些内存问题&#xff0c;你可以尝试不同的选项&#xff0c;比如强制 Go 重新创建map或使用指针。 在 Go 中使用map时&#xff0c;我们需要了解map增长和收缩的一些重要特性。让我们深入探讨这一点…

架构开发与优化咨询和实施服务

服务概述 得益于硬件平台算力的提升&#xff0c;汽车电子电气架构的集成度逐渐提高&#xff0c;从单体ECU、到功能域集成控制器、到区域集成控制器&#xff0c;多域融合成为了目前行业中软件工程的重要工作内容。同时&#xff0c;在传统控制器C代码开发的基础上&#xff0c;C、…

手把手从零开始训练YOLOv8改进项目(官方ultralytics版本)教程

手把手从零开始训练 YOLOv8 改进项目 (Ultralytics版本) 教程,改进 YOLOv8 算法 本文以Windows服务器为例:从零开始使用Windows训练 YOLOv8 算法项目 《芒果 YOLOv8 目标检测算法 改进》 适用于芒果专栏改进 YOLOv8 算法 文章目录 官方 YOLOv8 算法介绍改进网络代码汇总第…

CISP模拟试题(一)

免责声明 文章仅做经验分享用途,利用本文章所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,作者不为此承担任何责任,一旦造成后果请自行承担!!! 1.下面关于信息安全保障的说法错误的是:C A.信息安全保障的概念是与信息安全的概念同时产生的 …

ROS参数服务器(Param):通信模型、Hello World与拓展

参数服务器在ROS中主要用于实现不同节点之间的数据共享。 参数服务器相当于是独立于所有节点的一个公共容器&#xff0c;可以将数据存储在该容器中&#xff0c;被不同的节点调用&#xff0c;当然不同的节点也可以往其中存储数据。 使用场景一般存储一些机器人的固有参数&…

20、动态路由_下滑线为前缀的目录

创建文件 pages_question\index.vue pages_question\detail.vue 生成的对应路由&#xff1a; const _6bf6ece8 () > interopDefault(import(..\\pages\\_question\\index.vue /* webpackChunkName: "pages/_question/index" */)) const _a98c80aa () > in…

AIGC 技术在淘淘秀场景的探索与实践

本文介绍了AIGC相关领域的爆发式增长&#xff0c;并探讨了淘宝秀秀(AI买家秀)的设计思路和技术方案。文章涵盖了图像生成、仿真形象生成和换背景方案&#xff0c;以及模型流程串联等关键技术。 文章还介绍了淘淘秀的使用流程和遇到的问题及处理方法。最后&#xff0c;文章展望…

安全项目简介

安全项目 基线检查 密码 复杂度有效期 用户访问和身份验证 禁用administrator禁用guest认证失败锁定 安全防护软件操作系统安全配置 关闭自动播放 文件和目录权限端口限制安全审计… 等保测评 是否举办了安全意识培训是否有应急响应预案有无第一负责人 工作内容 测评准备…