文章目录
- 一、原理实现
- 1. 方案设计流程图
- 2. 实现原理
- 二、mysql开启binlog模式
- 2.1. 配置my.ini
- 2.2. 重启mysql服务
- 2.3. 验证binlog模式
- 2.4. 创建canal账号
- 2.5. 账号验证
- 三、docker-compose环境搭建
- 3.1. 环境总览
- 3.2. 编写docker-compose.yml
- 3.3. 安装docker-compose
- 3.4. 构建环境
- 3.5. 环境验证
- 3.6. 异常解决
- 四、微服务项目实战
- 4.1. 项目依赖
- 4.2. yml配置
- 4.3. 索引对象
- 4.4. 监听对象
- 4.5. 表结构
- 4.6. 类型常量
- 四、测试验证
- 4.1. 改变数据
- 4.2. 监听数据变化
一、原理实现
1. 方案设计流程图
2. 实现原理
二、mysql开启binlog模式
2.1. 配置my.ini
找到my.ini配置文件位置
添加以下内容
# 开启binlog
log-bin=mysql-bin
# 选择ROW模式
binlog-format=ROW
# 配置Mysql replaction 需要定义 不要和canal的serverId 重复
server_id=1
2.2. 重启mysql服务
鼠标右击重启
2.3. 验证binlog模式
log_bin为ON 则binlog开启
show variables like 'log_%';
2.4. 创建canal账号
mysql -uroot -p123456
# 创建账号(账号:canal 密码:canal)
CREATE USER canal IDENTIFIED BY 'canal';
# 赋予权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新并应用权限
flush privileges;
2.5. 账号验证
新建mysql连接,用户canal 密码canal 主机为win本机ip地址
三、docker-compose环境搭建
3.1. 环境总览
中间件 | ip地址 | 端口 | |
---|---|---|---|
mysql | 8.0.26 | 192.168.43.122 | 3306 |
canal-server | 1.1.4 | 192.168.159.142 | 11111 |
zookeeper | latest | 192.168.159.142 | 2181 |
kafka | 2.12-2.3.0 | 192.168.159.142 | 9092 |
elasticsearch | 6.8.6 | 192.168.159.142 | 9200/9300 |
kibana | 6.5.4 | 192.168.159.142 | 5601 |
3.2. 编写docker-compose.yml
mysql 在windows本地
IP | 说明 | 备注 |
---|---|---|
192.168.43.122 | mysql连接地址 | 或者开放指定中间件端口 |
192.168.159.142 | 宿主机地址 | 关闭防火墙或者开放指定中间件端口 |
version: '2'
services: canal-server: image: canal/canal-server:v1.1.4container_name: canal-serverports: - 11111:11111environment: - canal.instance.mysql.slaveId=12- canal.auto.scan=false- canal.instance.master.address=192.168.43.122:3306- canal.instance.dbUsername=canal- canal.instance.dbPassword=canal- canal.mq.topic=mayikt-20212-topic- canal.serverMode=kafka- canal.mq.servers=192.168.159.142:9092volumes: - /app/canal-server/conf/:/admin/canal-server/conf/- /app/canal-server/logs/:/admin/canal-server/logs/zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"restart: alwayskafka:image: wurstmeister/kafka:2.12-2.3.0ports:- "9092:9092"environment:- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.159.142:9092- KAFKA_LISTENERS=PLAINTEXT://:9092volumes:- /var/run/docker.sock:/var/run/docker.sockrestart: alwayselasticsearch:image: daocloud.io/library/elasticsearch:6.8.6restart: alwayscontainer_name: elasticsearchenvironment:- "ES_JAVA_OPTS=-Xms512m -Xmx512m"ports: - "9200:9200"- "9300:9300"kibana:image: daocloud.io/library/kibana:6.5.4restart: alwayscontainer_name: kibanaports:- "5601:5601"environment:- elasticsearch_url=http://192.168.159.142:9200depends_on:- elasticsearch
3.3. 安装docker-compose
安装docker-compose插件
https://blog.csdn.net/weixin_40816738/article/details/126422834
3.4. 构建环境
mkdir /app/canal
把docker-compose.yml上传至/app/canal目录下
开始构建
docker-compose up -d
3.5. 环境验证
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cb47055de10b daocloud.io/library/kibana:6.5.4 "/usr/local/bin/kiba…" 3 hours ago Up 3 hours 0.0.0.0:5601->5601/tcp, :::5601->5601/tcp kibana
07cdb595dc8d wurstmeister/kafka:2.12-2.3.0 "start-kafka.sh" 3 hours ago Up 3 hours 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp canal_kafka_1
78e49092fc37 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 3 hours ago Up 3 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp canal_zookeeper_1
7bcb0ad7bf89 daocloud.io/library/elasticsearch:6.8.6 "/usr/local/bin/dock…" 3 hours ago Up 3 hours 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp elasticsearch
12328f934a1b canal/canal-server:v1.1.4 "/alidata/bin/main.s…" 3 hours ago Up 3 hours 9100/tcp, 11110/tcp, 11112/tcp, 0.0.0.0:11111->11111/tcp, :::11111->11111/tcp canal-server
es: http://192.168.122.128:9200/
kibana:http://192.168.159.142:5601/app/kibana#/home?_g=()
zk:
3.6. 异常解决
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
https://blog.csdn.net/weixin_40816738/article/details/126333689?
四、微服务项目实战
4.1. 项目依赖
备注:es版本服务端和客户端要一致
<!-- springBoot集成kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency>
4.2. yml配置
# kafka
spring:kafka:# kafka服务器地址(可以多个)bootstrap-servers: 192.168.159.142:9092consumer:# 指定一个默认的组名group-id: kafka2# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: earliest# key/value的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: falseproducer:# key/value的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288data:elasticsearch:repositories:enabled: truecluster-name: docker-clustercluster-nodes: 192.168.159.142:9300
server:port: 8085
4.3. 索引对象
商品信息表(commodity_info表)索引对象,企业中真实场景应该先筛选搜索频次较高的信息然后,编写索引结构存储es
package com.mayikt.canal.commodity.es.bean;import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;/*** @author gblfy* @date 2022-8-19* 商品信息表(commodity_info表)索引对象*/
@Data
//通过这个注解可以声明一个文档,指定其所在的索引库和type
@Document(indexName = "commodityinfo")
public class CommodityInfo implements Serializable {@Idprivate Long id;private Long commodityId;//商品ID@Field(type = FieldType.Text)private String name;//商品名称@Field(type = FieldType.Text)private String mainImage;//大图片@Field(type = FieldType.Text)private String subImage;//小图片@Field(type = FieldType.Text)private String detail;//详情@Field(type = FieldType.Text)private String attributeList;//属性列表@Field(type = FieldType.Double)private BigDecimal price;//价格@Field(type = FieldType.Integer)private Long stock;//库存@Field(type = FieldType.Integer)private Integer status;//状态@Field(type = FieldType.Date)private Date createTime;//创建时间private Date updateTime;//更新时间
}
4.4. 监听对象
package com.mayikt.canal.commodity.consumer;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mayikt.canal.commodity.constant.MayiktConstant;
import com.mayikt.canal.commodity.es.bean.CommodityInfo;
import com.mayikt.canal.commodity.es.repository.CommodityInfoRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author gblfy* @date 2022-8-19* 监听mysql数据库*/
@Component
@Slf4j
public class CanalConsumer {@Autowiredprivate CommodityInfoRepository commodityInfoRepository;/*** 消费者监听mayikt-topic** @param consumer*/@KafkaListener(topics = "mayikt-20212-topic")public void receive01(ConsumerRecord<?, ?> consumer) {log.info("分组1的消费者1>topic名称:{},,key:{},分区位置:{},offset{},数据:{}<",consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), consumer.value());String json = (String) consumer.value();JSONObject jsonObject = JSONObject.parseObject(json);//监听数据 变化类型UPDATE INSERT DELETE 根据type调用es不同apiString type = jsonObject.getString("type");String pkNames = jsonObject.getJSONArray("pkNames").getString(0);JSONArray data = jsonObject.getJSONArray("data");//监听表名称String table = jsonObject.getString("table");//监听数据库String database = jsonObject.getString("database");for (int i = 0; i < data.size(); i++) {JSONObject dataObject = data.getJSONObject(i);//商品信息表(commodity_info表)索引对象 这是一个通用方法,根据不同数据库的不同表走不通分支进行数据同步esCommodityInfo productEntity = dataObject.toJavaObject(CommodityInfo.class);switch (type) {case MayiktConstant.CANAL_UPDATE:case MayiktConstant.CANAL_INSERT:commodityInfoRepository.save(productEntity);break;case MayiktConstant.CANAL_DELETE:commodityInfoRepository.delete(productEntity);break;}}}
}
4.5. 表结构
create database `mayikt-integral`;
use `mayikt-integral`;SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for commodity_info
-- ----------------------------
DROP TABLE IF EXISTS `commodity_info`;
CREATE TABLE `commodity_info` (`id` int NOT NULL,`commodity_id` int NULL DEFAULT NULL COMMENT '商品ID',`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品名称',`subtitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品子标题',`main_image` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '大图片',`sub_image` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '小图片',`detail` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '详情',`attribute_list` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '属性列表',`price` int NULL DEFAULT NULL COMMENT '价格',`stock` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '库存',`status` int NULL DEFAULT NULL COMMENT '状态',`create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',`update_time` datetime NULL DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '商品信息表' ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of commodity_info
-- ----------------------------
INSERT INTO `commodity_info` VALUES (1, 10000, 'meite11111', '2030华为P100全新架构师10G手机', '1', '1', '2030华为P100全新架构10G手机', '[{\"内存\":\"128GB\",\"颜色\":\"红色\",\"国行\"},{\"内存\":\"128GB\",\"颜色\":\"红色\",\"国行\"}]', NULL, NULL, NULL, '2021-04-27 16:22:28', '2021-04-27 16:22:33');SET FOREIGN_KEY_CHECKS = 1;
4.6. 类型常量
package com.mayikt.canal.commodity.constant;/*** @author gblfy* @date 2022-8-19* 类型常量*/
public interface MayiktConstant {String CANAL_UPDATE = "UPDATE";String CANAL_DELETE = "DELETE";String CANAL_INSERT = "INSERT";
}
- es接口
package com.mayikt.canal.commodity.es.repository;import com.mayikt.canal.commodity.es.bean.CommodityInfo;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;/*** @author gblfy* @date 2022-8-19* es接口*/
@Repository
public interface CommodityInfoRepository extends ElasticsearchRepository<CommodityInfo, Long> {
}
四、测试验证
4.1. 改变数据
4.2. 监听数据变化
json格式化后
同步后