canal: 连接kafka (docker)

一、确保mysql binlog开启并使用ROW作为日志格式
docker 启动mysql 5.7配置文件
my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server-id=1

在这里插入图片描述

在这里插入图片描述
一定要确保上述两个值一个为ROW,一个为ON

二、下载canal的run.sh

https://github.com/alibaba/canal/blob/master/docker/run.sh

三、启动canal容器,其中配置了mysql的信息和kafka的信息:

./run.sh -e canal.auto.scan=false -e canal.destinations=me -e canal.instance.master.address=xx.xx.xx.xx:3306  -e canal.instance.dbUsername=root  -e canal.instance.dbPassword=yourpassword  -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false -e canal.instance.topic=me -e canal.mq.partition=0  -e canal.mq.topic=me

四、通过docker exec -it canal-server /bin/bash进入canal容器,编辑容器中的

/home/admin/canal-server/conf/canal.properties

其中要修改两处,一是工作模式改为kafka(canal.serverMode),二是改MQ的地址(canal.mq.servers),完整配置类似如下:

#################################################
######### 		common argument		############# 
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360# aliyun ak/sk , support rds/mq
canal.aliyun.accesskey =
canal.aliyun.secretkey =#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 		     MQ 		     #############
##################################################
canal.mq.servers = xx.xx.xx.xx:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all

五、重启canal容器
docker restart canal-server

这样,对于上面的mysql 数据库中的增删改,在kakfa都可以在me 这个topic 收到数据了。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/773231.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一周学会Django5 Python Web开发-Django5模型定义

锋哥原创的Python Web开发 Django5视频教程: 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计41条视频,包括:2024版 Django5 Python we…

26. BI - PageRank 拓展以及如何利用 networkx 来分析希拉里丑闻

本文为 「茶桁的 AI 秘籍 - BI 篇 第 26 篇」 Hi, 我是茶桁. 上节课咱们讲解了 PageRank 的两种模型, 并分别做了代码上的演示. 这节课, 让我们来看看 PageRank 的影响力及其应用. PageRank 已经超越了原来提出来的模型, 因为 PageRank 的影响力影响到了后续很多的一些模型, …

我的创作纪念日 ---- 2024/3/26

前言 2024.3.26是我在CSDN成为创作者的第128天,也是我第一次真正在网上创作的第128天 当我还在日常创作时,突然发现我收到了一封信 我想我可以分享一下这段时间的感想以及收获 机缘 在CSDN的这段时间里,我学习到了很多知识,也…

数据结构——链表(单链表)

大家好,又是我(小锋),今天给大家带了一个比较有挑战的章节(链表),但是不用担心,小锋会陪大家一起度过。 顺序表的思考与问题 1. 中间/头部的插入删除,时间复杂度为O(N) …

【python】flask模板渲染引擎Jinja2,通过后端数据渲染前端页面

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…

Spring Cloud 八:微服务架构中的数据管理

Spring Cloud 一:Spring Cloud 简介 Spring Cloud 二:核心组件解析 Spring Cloud 三:API网关深入探索与实战应用 Spring Cloud 四:微服务治理与安全 Spring Cloud 五:Spring Cloud与持续集成/持续部署(CI/C…

Eladmin-jpa基于SpringBoot和Vue的前后端分离后台管理系统​

在当今快速发展的软件开发领域,前后端分离的架构模式已经成为主流。这种架构模式不仅可以提高开发效率,还能使系统更加易于维护和扩展。Eladmin-jpa是一个基于Spring Boot 2.6.4、Spring Boot Jpa、JWT、Spring Security、Redis和Vue的前后端分离的后台管…

JS等比压缩图片方法

AI给出来的答案,AI真的能改变世界,以后程序员这个职业真的有可能不存在了。 function compressImage(image, callback) {// 创建一个 canvas 元素const canvas document.createElement(canvas);canvas.width 48;canvas.height 48;// 获取 canvas 的绘…

[WTL/Win32]_[初级]_[如何设置ListView的列宽不出现水平滚动条]

场景 开发WTL/Win32的程序时,经常会用到表格控件CListViewCtrl。这个控件需要设置列的宽度,当用完100%的宽度来平均分配给列宽时,一加载数据多,就会出现垂直滚动条后,水平滚动条也会同时出现的问题。怎么设置才能让水…

【研发日记】Matlab/Simulink开箱报告(十)——Signal Routing模块模块

文章目录 前言 Signal Routing模块 虚拟模块和虚拟信号 Mux和Demux Vector Concatenate和Selector Bus Creator和Bus Selector 分析和应用 总结 前言 见《开箱报告,Simulink Toolbox库模块使用指南(五)——S-Fuction模块(C MEX S-Fun…

单链表专题(上)(顺序表链表线性表)

在开始之前思考一个顺序表的问题 1. 中间/头部的插⼊删除,时间 复杂度为O(N) 2. 增容需要申请新空间,拷⻉数据,释放旧空间。会有不⼩的消耗。 3. 增容⼀般是呈2倍的增⻓,势必会有⼀定的空间浪费。例如当前容量为100,…

订单系统-RPC快速入门

RPC快速入门 概述 关于rpc,只需要知道他是一种协议,项目之间能够远程调用函数。 快速入门 我们前边下载好的两个包,在idea中打开之后,我们创建这么几个文件夹。 至于是干什么的,以后细说。创建好之后我们在produc…

从零开始搭建游戏服务器 第七节 创建GameServer

目录 前言正文创建GameServer模块修改配置创建NettyClient连接到登录服登录服修改创建协议游戏服注册到登录服 总结 前言 上一节我们使用自定义注解反射简化了协议解包和逻辑处理分发流程。 那么到了这里登录服登录服的架构已经搭建的差不多了,一些比较简单的、并发…

使用 Outline 构建 企业 or 个人 知识库面临的问题

前不久,我写了一篇文章,介绍《如何在本地部署安装 Outline》,我之所以写这篇文章,主要原因是我最近需要做一个项目,使用 Outline 来构建一个公司级知识库。所以我需要在本地先搭建一个,来撰写一些前期的文档…

【环境配置】Ubuntu MySQL 8.0.28 安装并允许外部客户端连接

文章目录 MySQL 安装步骤配置 MySQL Server 允许外部连接 MySQL 安装步骤 步骤一:在 MySQL 官网找到 apt 仓库,下载最新的仓库 点击 Download: 输入如下命令: sudo wget -c https://dev.mysql.com/get/mysql-apt-config_0.8…

2014年认证杯SPSSPRO杯数学建模A题(第一阶段)轮胎的花纹全过程文档及程序

2014年认证杯SPSSPRO杯数学建模 A题 轮胎的花纹 原题再现: 轮胎被广泛使用在多种陆地交通工具上。根据性能的需要,轮胎表面常会加工出不同形状的花纹。在设计轮胎时,往往要针对其使用环境,设计出相应的花纹形状。   第一阶段问…

JVM(三)——字节码技术

三、字节码技术 1、类文件结构 一个简单的 HelloWorld.java package com.mysite.jvm.t5; // HelloWorld 示例 public class HelloWorld {public static void main(String[] args) {System.out.println("hello world");} }执行 javac -parameters -d . HellowWorld.…

Selenium 自动化 —— 实战篇之自动登录163邮箱

Selenium 自动化专栏系列文章 (一)入门和 Hello World 实例(二)使用WebDriverManager自动下载驱动(三)Selenium IDE录制、回放、导出Java源码(四)浏览器窗口操作(五&…

AI时代-普通人的AI绘画工具对比(Midjouney与Stable Diffusion)

AI时代-普通人的AI绘画工具对比(Midjouney与Stable Diffusion) 前言1、基础对比Stable Diffusion(SD)SD界面安装与使用SD Midjouney(MJ) 2、硬件与运行要求对比Stable Diffusion硬件要求内存硬盘显卡 Midjo…

Linux实现m4a格式转换为wav格式

需要在linux上安装ffmpeg 参考博客 Linux上安装ffmpeg修改环境变量【这一点很重要,自己因为没有添加环境变量,捣鼓了很长时间】 将ffmpeg的绝对路径添加到 PATH 环境变量中,以让系统能找到ffmpeg的安装路径。 # /home//project/ffmpeg-6.1-a…