canal: 连接kafka (docker)

一、确保mysql binlog开启并使用ROW作为日志格式
docker 启动mysql 5.7配置文件
my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server-id=1

在这里插入图片描述

在这里插入图片描述
一定要确保上述两个值一个为ROW,一个为ON

二、下载canal的run.sh

https://github.com/alibaba/canal/blob/master/docker/run.sh

三、启动canal容器,其中配置了mysql的信息和kafka的信息:

./run.sh -e canal.auto.scan=false -e canal.destinations=me -e canal.instance.master.address=xx.xx.xx.xx:3306  -e canal.instance.dbUsername=root  -e canal.instance.dbPassword=yourpassword  -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false -e canal.instance.topic=me -e canal.mq.partition=0  -e canal.mq.topic=me

四、通过docker exec -it canal-server /bin/bash进入canal容器,编辑容器中的

/home/admin/canal-server/conf/canal.properties

其中要修改两处,一是工作模式改为kafka(canal.serverMode),二是改MQ的地址(canal.mq.servers),完整配置类似如下:

#################################################
######### 		common argument		############# 
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360# aliyun ak/sk , support rds/mq
canal.aliyun.accesskey =
canal.aliyun.secretkey =#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 		     MQ 		     #############
##################################################
canal.mq.servers = xx.xx.xx.xx:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all

五、重启canal容器
docker restart canal-server

这样,对于上面的mysql 数据库中的增删改,在kakfa都可以在me 这个topic 收到数据了。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/773231.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一周学会Django5 Python Web开发-Django5模型定义

锋哥原创的Python Web开发 Django5视频教程: 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计41条视频,包括:2024版 Django5 Python we…

C语言实现:变位词程序拓展问题

开篇 今天的问题,是在之前变位词程序的基础上,进行了一些拓展。问题来源于《编程珠玑》第2章,课后习题1。 问题概要 考虑查找给定输入单词的所有变位词问题,仅给定单词和字典的情况下,如何解决该问题?如果有…

26. BI - PageRank 拓展以及如何利用 networkx 来分析希拉里丑闻

本文为 「茶桁的 AI 秘籍 - BI 篇 第 26 篇」 Hi, 我是茶桁. 上节课咱们讲解了 PageRank 的两种模型, 并分别做了代码上的演示. 这节课, 让我们来看看 PageRank 的影响力及其应用. PageRank 已经超越了原来提出来的模型, 因为 PageRank 的影响力影响到了后续很多的一些模型, …

【疑惑】-谷歌是如何获取数据的

搜索引擎爬虫: 谷歌的搜索引擎通过爬虫程序在互联网上爬取和收集网页信息。这些爬虫会遵循特点的算法和规则,访问内容,并且提取出关键信息 用户的搜索行为: 当用户使用谷歌搜索引擎进行搜索的时候,谷歌会收集分析用户…

【前端学习——js篇】7.函数缓存

具体见:https://github.com/febobo/web-interview 7.函数缓存 函数缓存,就是将函数运算过的结果进行缓存 本质上就是用空间(缓存存储)换时间(计算过程) 常用于缓存数据计算结果和缓存对象。 其实现主要…

Code Review(代码审查)

代码审查是软件开发生命周期的重要组成部分。它能显著提高开发人员的代码质量。 这个过程就像写一本书。作者写好了内容,出版社编辑对其进行了校审,所以没有出现任何错误,例如将“你”与“你的”混淆。这个案例中,代码审查是阅读…

Linux reboot命令教程:如何安全地重启你的Linux系统(附实例详解和注意事项)

Linux reboot命令介绍 reboot命令用于重新启动你的Linux系统。当你的系统内核更新时,除非你正在使用Livepatch或KernelCare,否则你需要重启你的Linux系统。在其他情况下,例如解决硬件问题、安装应用程序等,也可能需要重新启动系统…

我的创作纪念日 ---- 2024/3/26

前言 2024.3.26是我在CSDN成为创作者的第128天,也是我第一次真正在网上创作的第128天 当我还在日常创作时,突然发现我收到了一封信 我想我可以分享一下这段时间的感想以及收获 机缘 在CSDN的这段时间里,我学习到了很多知识,也…

数据结构——链表(单链表)

大家好,又是我(小锋),今天给大家带了一个比较有挑战的章节(链表),但是不用担心,小锋会陪大家一起度过。 顺序表的思考与问题 1. 中间/头部的插入删除,时间复杂度为O(N) …

【python】flask模板渲染引擎Jinja2,通过后端数据渲染前端页面

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…

Spring Cloud 八:微服务架构中的数据管理

Spring Cloud 一:Spring Cloud 简介 Spring Cloud 二:核心组件解析 Spring Cloud 三:API网关深入探索与实战应用 Spring Cloud 四:微服务治理与安全 Spring Cloud 五:Spring Cloud与持续集成/持续部署(CI/C…

Eladmin-jpa基于SpringBoot和Vue的前后端分离后台管理系统​

在当今快速发展的软件开发领域,前后端分离的架构模式已经成为主流。这种架构模式不仅可以提高开发效率,还能使系统更加易于维护和扩展。Eladmin-jpa是一个基于Spring Boot 2.6.4、Spring Boot Jpa、JWT、Spring Security、Redis和Vue的前后端分离的后台管…

JS等比压缩图片方法

AI给出来的答案,AI真的能改变世界,以后程序员这个职业真的有可能不存在了。 function compressImage(image, callback) {// 创建一个 canvas 元素const canvas document.createElement(canvas);canvas.width 48;canvas.height 48;// 获取 canvas 的绘…

[WTL/Win32]_[初级]_[如何设置ListView的列宽不出现水平滚动条]

场景 开发WTL/Win32的程序时,经常会用到表格控件CListViewCtrl。这个控件需要设置列的宽度,当用完100%的宽度来平均分配给列宽时,一加载数据多,就会出现垂直滚动条后,水平滚动条也会同时出现的问题。怎么设置才能让水…

Stable Diffusion 本地部署教程

Stable Diffusion是一种用于构建和部署机器学习模型的开源工具。以下是在本地环境中部署 Stable Diffusion 的基本步骤: 步骤 1: 准备环境 确保你的系统中已经安装了以下软件和工具: Python(建议使用 Python 3.x)pip(Python 包管理工具)Docker(可选,用于容器化部署)…

【研发日记】Matlab/Simulink开箱报告(十)——Signal Routing模块模块

文章目录 前言 Signal Routing模块 虚拟模块和虚拟信号 Mux和Demux Vector Concatenate和Selector Bus Creator和Bus Selector 分析和应用 总结 前言 见《开箱报告,Simulink Toolbox库模块使用指南(五)——S-Fuction模块(C MEX S-Fun…

三、 mariadb数据库用户管理

1)查询有哪些用户 MariaDB> select user,host from mysql.user; ----------------- | user | host | ----------------- | root | 127.0.0.1 | | root | ::1 | | | localhost | | root | localhost | | | oldboy | | root | oldboy | ---------…

单链表专题(上)(顺序表链表线性表)

在开始之前思考一个顺序表的问题 1. 中间/头部的插⼊删除,时间 复杂度为O(N) 2. 增容需要申请新空间,拷⻉数据,释放旧空间。会有不⼩的消耗。 3. 增容⼀般是呈2倍的增⻓,势必会有⼀定的空间浪费。例如当前容量为100,…

类模板分文件编写

问题: 类模板中成员函数创建时机是在调用阶段,导致分文件编写时链接不到 解决: 解决方式1:直接包含.cpp源文件 解决方式2:将声明和实现写到同一个文件中,并更改后缀名为.hpp,hpp是约定的名称…

订单系统-RPC快速入门

RPC快速入门 概述 关于rpc,只需要知道他是一种协议,项目之间能够远程调用函数。 快速入门 我们前边下载好的两个包,在idea中打开之后,我们创建这么几个文件夹。 至于是干什么的,以后细说。创建好之后我们在produc…