canal kafka 实现mysql与es/redis 数据同步

文章目录

          • 一、原理实现
            • 1. 方案设计流程图
            • 2. 实现原理
          • 二、mysql开启binlog模式
            • 2.1. 配置my.ini
            • 2.2. 重启mysql服务
            • 2.3. 验证binlog模式
            • 2.4. 创建canal账号
            • 2.5. 账号验证
          • 三、docker-compose环境搭建
            • 3.1. 环境总览
            • 3.2. 编写docker-compose.yml
            • 3.3. 安装docker-compose
            • 3.4. 构建环境
            • 3.5. 环境验证
            • 3.6. 异常解决
          • 四、微服务项目实战
            • 4.1. 项目依赖
            • 4.2. yml配置
            • 4.3. 索引对象
            • 4.4. 监听对象
            • 4.5. 表结构
            • 4.6. 类型常量
          • 四、测试验证
            • 4.1. 改变数据
            • 4.2. 监听数据变化

一、原理实现
1. 方案设计流程图

在这里插入图片描述

2. 实现原理
二、mysql开启binlog模式
2.1. 配置my.ini

找到my.ini配置文件位置
添加以下内容

# 开启binlog 
log-bin=mysql-bin
# 选择ROW模式 
binlog-format=ROW 
# 配置Mysql replaction 需要定义 	不要和canal的serverId 重复
server_id=1

在这里插入图片描述

2.2. 重启mysql服务

鼠标右击重启
在这里插入图片描述

2.3. 验证binlog模式

log_bin为ON 则binlog开启

show variables like 'log_%';

在这里插入图片描述

2.4. 创建canal账号
mysql -uroot -p123456
# 创建账号(账号:canal 密码:canal)
CREATE USER canal IDENTIFIED BY 'canal';
# 赋予权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新并应用权限
flush privileges;
2.5. 账号验证

新建mysql连接,用户canal 密码canal 主机为win本机ip地址
在这里插入图片描述

三、docker-compose环境搭建
3.1. 环境总览
中间件ip地址端口
mysql8.0.26192.168.43.1223306
canal-server1.1.4192.168.159.14211111
zookeeperlatest192.168.159.1422181
kafka2.12-2.3.0192.168.159.1429092
elasticsearch6.8.6192.168.159.1429200/9300
kibana6.5.4192.168.159.1425601
3.2. 编写docker-compose.yml

mysql 在windows本地

IP说明备注
192.168.43.122mysql连接地址或者开放指定中间件端口
192.168.159.142宿主机地址关闭防火墙或者开放指定中间件端口
version: '2'
services: canal-server: image: canal/canal-server:v1.1.4container_name: canal-serverports: - 11111:11111environment: - canal.instance.mysql.slaveId=12- canal.auto.scan=false- canal.instance.master.address=192.168.43.122:3306- canal.instance.dbUsername=canal- canal.instance.dbPassword=canal- canal.mq.topic=mayikt-20212-topic- canal.serverMode=kafka- canal.mq.servers=192.168.159.142:9092volumes: - /app/canal-server/conf/:/admin/canal-server/conf/- /app/canal-server/logs/:/admin/canal-server/logs/zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"restart: alwayskafka:image: wurstmeister/kafka:2.12-2.3.0ports:- "9092:9092"environment:- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.159.142:9092- KAFKA_LISTENERS=PLAINTEXT://:9092volumes:- /var/run/docker.sock:/var/run/docker.sockrestart: alwayselasticsearch:image: daocloud.io/library/elasticsearch:6.8.6restart: alwayscontainer_name: elasticsearchenvironment:- "ES_JAVA_OPTS=-Xms512m -Xmx512m"ports: - "9200:9200"- "9300:9300"kibana:image: daocloud.io/library/kibana:6.5.4restart: alwayscontainer_name: kibanaports:- "5601:5601"environment:- elasticsearch_url=http://192.168.159.142:9200depends_on:- elasticsearch
3.3. 安装docker-compose

安装docker-compose插件
https://blog.csdn.net/weixin_40816738/article/details/126422834

3.4. 构建环境
mkdir /app/canal

把docker-compose.yml上传至/app/canal目录下

开始构建

docker-compose up -d
3.5. 环境验证
docker ps
CONTAINER ID   IMAGE                                     COMMAND                 CREATED      STATUS       PORTS                                                                                  NAMES
cb47055de10b   daocloud.io/library/kibana:6.5.4          "/usr/local/bin/kiba…"   3 hours ago   Up 3 hours   0.0.0.0:5601->5601/tcp, :::5601->5601/tcp                                              kibana
07cdb595dc8d   wurstmeister/kafka:2.12-2.3.0             "start-kafka.sh"         3 hours ago   Up 3 hours   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                                              canal_kafka_1
78e49092fc37   wurstmeister/zookeeper                    "/bin/sh -c '/usr/sb…"   3 hours ago   Up 3 hours   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp                  canal_zookeeper_1
7bcb0ad7bf89   daocloud.io/library/elasticsearch:6.8.6   "/usr/local/bin/dock…"   3 hours ago   Up 3 hours   0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp   elasticsearch
12328f934a1b   canal/canal-server:v1.1.4                 "/alidata/bin/main.s…"   3 hours ago   Up 3 hours   9100/tcp, 11110/tcp, 11112/tcp, 0.0.0.0:11111->11111/tcp, :::11111->11111/tcp          canal-server

在这里插入图片描述

es: http://192.168.122.128:9200/
kibana:http://192.168.159.142:5601/app/kibana#/home?_g=()
zk:
在这里插入图片描述

3.6. 异常解决

max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
https://blog.csdn.net/weixin_40816738/article/details/126333689?

四、微服务项目实战
4.1. 项目依赖

备注:es版本服务端和客户端要一致

 <!-- springBoot集成kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency>

在这里插入图片描述

4.2. yml配置
# kafka
spring:kafka:# kafka服务器地址(可以多个)bootstrap-servers: 192.168.159.142:9092consumer:# 指定一个默认的组名group-id: kafka2# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: earliest# key/value的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: falseproducer:# key/value的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288data:elasticsearch:repositories:enabled: truecluster-name: docker-clustercluster-nodes: 192.168.159.142:9300
server:port: 8085
4.3. 索引对象

商品信息表(commodity_info表)索引对象,企业中真实场景应该先筛选搜索频次较高的信息然后,编写索引结构存储es

package com.mayikt.canal.commodity.es.bean;import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;/*** @author gblfy* @date 2022-8-19* 商品信息表(commodity_info表)索引对象*/
@Data
//通过这个注解可以声明一个文档,指定其所在的索引库和type
@Document(indexName = "commodityinfo")
public class CommodityInfo implements Serializable {@Idprivate Long id;private Long commodityId;//商品ID@Field(type = FieldType.Text)private String name;//商品名称@Field(type = FieldType.Text)private String mainImage;//大图片@Field(type = FieldType.Text)private String subImage;//小图片@Field(type = FieldType.Text)private String detail;//详情@Field(type = FieldType.Text)private String attributeList;//属性列表@Field(type = FieldType.Double)private BigDecimal price;//价格@Field(type = FieldType.Integer)private Long stock;//库存@Field(type = FieldType.Integer)private Integer status;//状态@Field(type = FieldType.Date)private Date createTime;//创建时间private Date updateTime;//更新时间
}
4.4. 监听对象
package com.mayikt.canal.commodity.consumer;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mayikt.canal.commodity.constant.MayiktConstant;
import com.mayikt.canal.commodity.es.bean.CommodityInfo;
import com.mayikt.canal.commodity.es.repository.CommodityInfoRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author gblfy* @date 2022-8-19* 监听mysql数据库*/
@Component
@Slf4j
public class CanalConsumer {@Autowiredprivate CommodityInfoRepository commodityInfoRepository;/*** 消费者监听mayikt-topic** @param consumer*/@KafkaListener(topics = "mayikt-20212-topic")public void receive01(ConsumerRecord<?, ?> consumer) {log.info("分组1的消费者1>topic名称:{},,key:{},分区位置:{},offset{},数据:{}<",consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), consumer.value());String json = (String) consumer.value();JSONObject jsonObject = JSONObject.parseObject(json);//监听数据 变化类型UPDATE INSERT DELETE 根据type调用es不同apiString type = jsonObject.getString("type");String pkNames = jsonObject.getJSONArray("pkNames").getString(0);JSONArray data = jsonObject.getJSONArray("data");//监听表名称String table = jsonObject.getString("table");//监听数据库String database = jsonObject.getString("database");for (int i = 0; i < data.size(); i++) {JSONObject dataObject = data.getJSONObject(i);//商品信息表(commodity_info表)索引对象 这是一个通用方法,根据不同数据库的不同表走不通分支进行数据同步esCommodityInfo productEntity = dataObject.toJavaObject(CommodityInfo.class);switch (type) {case MayiktConstant.CANAL_UPDATE:case MayiktConstant.CANAL_INSERT:commodityInfoRepository.save(productEntity);break;case MayiktConstant.CANAL_DELETE:commodityInfoRepository.delete(productEntity);break;}}}
}
4.5. 表结构
create database `mayikt-integral`;
use `mayikt-integral`;SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for commodity_info
-- ----------------------------
DROP TABLE IF EXISTS `commodity_info`;
CREATE TABLE `commodity_info`  (`id` int NOT NULL,`commodity_id` int NULL DEFAULT NULL COMMENT '商品ID',`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品名称',`subtitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品子标题',`main_image` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '大图片',`sub_image` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '小图片',`detail` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '详情',`attribute_list` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '属性列表',`price` int NULL DEFAULT NULL COMMENT '价格',`stock` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '库存',`status` int NULL DEFAULT NULL COMMENT '状态',`create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',`update_time` datetime NULL DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '商品信息表' ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of commodity_info
-- ----------------------------
INSERT INTO `commodity_info` VALUES (1, 10000, 'meite11111', '2030华为P100全新架构师10G手机', '1', '1', '2030华为P100全新架构10G手机', '[{\"内存\":\"128GB\",\"颜色\":\"红色\",\"国行\"},{\"内存\":\"128GB\",\"颜色\":\"红色\",\"国行\"}]', NULL, NULL, NULL, '2021-04-27 16:22:28', '2021-04-27 16:22:33');SET FOREIGN_KEY_CHECKS = 1;
4.6. 类型常量
package com.mayikt.canal.commodity.constant;/*** @author gblfy* @date 2022-8-19* 类型常量*/
public interface MayiktConstant {String CANAL_UPDATE = "UPDATE";String CANAL_DELETE = "DELETE";String CANAL_INSERT = "INSERT";
}
  • es接口
package com.mayikt.canal.commodity.es.repository;import com.mayikt.canal.commodity.es.bean.CommodityInfo;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;/*** @author gblfy* @date 2022-8-19* es接口*/
@Repository
public interface CommodityInfoRepository extends ElasticsearchRepository<CommodityInfo, Long> {
}
四、测试验证
4.1. 改变数据

在这里插入图片描述

4.2. 监听数据变化

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
json格式化后
在这里插入图片描述

在这里插入图片描述
同步后
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/515197.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

免费下载!《阿里工程师的自我修养》公开10位阿里大牛解决问题的思维方式

简介&#xff1a; 今天&#xff0c;阿里技术公布一波阿里P8、P9技术大牛的思维模型&#xff0c;将他们的思维模式呈现出来。你可以在阿里资深专家职业生涯的真切感悟中&#xff0c;找到应对危机的最佳方法。《阿里工程师的自我修养》现已正式公开&#xff0c;可免费下载阅读。 …

云原生时代消息中间件的演进路线

简介&#xff1a; 本文整理自作者于 2020 年云原生微服务大会上的分享《云原生时代的消息中间件演进》&#xff0c;主要探讨了传统的消息中间件如何持续进化为云原生的消息服务。 作者 | 周礼&#xff08;不铭&#xff09; 阿里巴巴集团消息中间件架构师 导读&#xff1a;本文…

四大“化学融合”、两大核心平台能力,华为首次系统解读OneStorage

4月14日&#xff0c;在2021华为全球分析师大会期间&#xff0c;华为举办数据存储专场Session&#xff0c;面向全球分析师全面解读下一代数据存储解决方案OneStorage&#xff0c;引领数据存储产业迈向全场景智能和多云融合。同时&#xff0c;在此期间华为首次向业界系统性地诠释…

科普|不同协议下远程服务器文件上传_下载优劣对比

简介&#xff1a; 作为一个程序员&#xff0c;如果不知道如何进行远程服务器的文件上传与下载&#xff0c;实在是一件尴尬的事情&#xff0c;今天我们聊聊如何实现远程服务器的文件上传与下载。 作为一个程序员&#xff0c;如果不知道如何进行远程服务器的文件上传与下载&#…

Module build failed: Error: Cannot find module ‘gifsicle‘

问题描述&#xff1a; build Cannot find module ‘gifsicle’ 解决方案&#xff1a; 第一步&#xff1a;卸载image-webpack-loader 第1种方式&#xff1a;删除项目中的image-webpack-loader npm uninstall image-webpack-loader第2种方式&#xff1a;删除node_modules中的im…

高德AR驾车导航解决方案

简介&#xff1a; 高德从2018年首创了车载AR导航后&#xff0c;已经先后在后视镜、智能车盒、前装整车厂、后装车机产品、行车记录仪等众多场景落地应用&#xff0c;搭建了非常完整的AR导航生态。 日前&#xff0c;高德地图最新发布了v10.60新版本&#xff0c;上线了手机端的A…

第 11 个“世界备份日”刚过,《Veeam 2021 数据保护报告》为你解读全球数据备份现状

2011 年 3 月 31 日&#xff0c;美国网络社区 Reddit 发起“世界备份日&#xff08;World Backup Day&#xff09;”倡议活动&#xff0c;号召人们做好数据安全备份。于是每年愚人节前一天成为“世界备份日”&#xff0c;口号很有趣 Don’t Be An April Fool,Backup Your Data&…

知乎李大海对话阿里云贾扬清:透视AI应用难题与未来趋势

自AlphaGo接连战胜李世石与柯洁后&#xff0c;越来越多从业者将AI看做科技行业的未来。大大小小的AI公司兴起&#xff0c;国内外巨头公司纷纷加速向AI转型。但经历祛魅后的AI&#xff0c;在过去几年间却并未获得观察者们预想的火箭式爆发。 “AI行业接下来可能有哪些发展&…

当 Kubernetes 遇到机密计算,阿里巴巴如何保护容器内数据的安全?

简介&#xff1a; 8 月 26 日&#xff0c;我们发起了第 6 期 SIG Cloud-Provider-Alibaba 网研会直播。本次直播主要介绍了机密计算的概况&#xff0c; InclavareContainers 开源项目架构、已支持的功能和迭代计划&#xff0c;以及阿里云 ACK-TEE 的发展现状和规划。本文汇集了…

人脸核身基础版 SDK 接入 > 合作方后台上送身份信息

文章目录一、概述二、实现流程2.1. 获取获取 access_token2.2. 获取 SIGN ticket2.3. 生成签名2.4. 上送身份信息2.5. 获取 NONCE ticket三、实战3.1. 获取获取 access_token3.2. 获取 SIGN ticket3.3. 生成签名3.4. 上送身份信息3.5. 获取 NONCE ticket四、开源地址一、概述 …

5G、射频、奥特曼,这仨有联系吗?

作者 | 小枣君来源 | 鲜枣课堂头图 | 下载于ICphoto手机&#xff0c;作为移动互联网时代的标配&#xff0c;已经走进了我们每个人的生活。有了它&#xff0c;我们可以随心所欲地聊天、购物、追剧&#xff0c;享受美好的人生。正因为手机如此重要&#xff0c;所以人们对相关技术…

一种简单快捷的 java 热部署方式

简介&#xff1a; 本文热部署插件(Arthas Hot Swap)是基于 Arthas redefine 命令实现的&#xff0c;使用该插件进行远程热部署无需任何配置&#xff0c;无需使用 debug 端口&#xff0c;只需几个简单动作就能完成。 作者 | 周忠太 阿里巴巴淘系技术部的一个搬砖工 【Arthas 官…

赠书 | IoT 的真正目标是什么

以往在构建物联网局域网系统时&#xff0c;为了方便考虑&#xff0c;在云端进行数据处理和分析已经成了常识。但是这种做法已经无法应对现在的情况。在物联网中边缘计算的必要性想要获取数据就要增加连接的设备数量&#xff0c;提高从传感器采集数据的记录&#xff08;获取&…

AI 腾讯云人脸核身之独立H5接入

文章目录一、概述二、合作方后台上送身份信息~实现流程2.1. 前端入参2.2. 后端固定参数2.3. 获取 Access Token2.4. 获取 SIGN ticket2.5. 生成签名2.6. 合作方后台上送身份信息三、启动H5人脸核身3.1. 获取h5faceId3.2. 获取nonce3.3. 获取nonceTicket3.4. 计算启动签名3.5. 构…

最佳实践:使用阿里云CDN加速OSS访问

简介&#xff1a; 用户直接访问OSS资源&#xff0c;访问速度会受到OSS的下行带宽以及Bucket地域的限制。如果通过CDN来访问OSS资源&#xff0c;带宽上限更高&#xff0c;并且可以将OSS的资源缓存至就近的CDN节点&#xff0c;通过CDN节点进行分发&#xff0c;访问速度更快&#…

IDEA 2022 CPU占用100%的问题及解决方法

禁用下面这三个插件然后重启IDEA即可&#xff1a; Package Checker Package Search Ktor 下面的内容可以不用看了&#xff0c;只要禁用这仨插件就行

无服务计算应用场景探讨及 FaaS 应用实战

简介&#xff1a; 无服务计算本身是一个概念或者理论模型&#xff0c;落地到具体技术上主要有函数即服务&#xff08;FaaS&#xff09;以及后端即服务&#xff08;BaaS&#xff09;两种形式&#xff0c;阿里云提供函数即服务 FaaS 产品。 作者 | 宋文龙&#xff08;闻可&#x…

想学 Python?那这套教程再适合你不过了!!

如果你想问最近这些年什么编程语言最值得学习&#xff0c;我相信很多人都会告诉你是Python&#xff01;所以不仅是开发小白&#xff0c;甚至很多开发老手&#xff0c;也都开始学习Python&#xff0c;作为辅助第二语言来提高自己的职场竞争力。不过结合我最近这些年Python的学习…

2020-09-01

简介&#xff1a; 《5天入门视觉AI》电子书来了&#xff01;身份证识别、电子相册两大实践场景带你快速入门视觉AI应用开发&#xff01; 阿里云“在家实践”全新出击&#xff01; 《5天入门视觉AI》电子书正式上线&#xff01; 视觉AI训练营必备教材&#xff01; 身份证识别、电…

再见 Nacos,我要玩 Service Mesh 了!

作者 | 姜桥出品 | CSDN云计算&#xff08;ID:CSDNcloud&#xff09;前面的文章<<干货|如何步入Service Mesh微服务架构时代>>实战演练了Service Mesh微服务架构的具体玩法&#xff0c;该案例中通过IstioKubernetes的组合&#xff0c;一组以Spring Boot框架开发的服…