canal server初始化源码分析

CanalLauncher类是canal server端启动的入口类,跟随代码进行深入。

在开始之前,我们可以先了解下,

canal 配置方式
  1. ManagerCanalInstanceGenerator: 基于manager管理的配置方式,实时感知配置并进行server重启
  2. SpringCanalInstanceGenerator:基于本地spring xml的配置方式,对于多instance的时候,不便于扩展,一个instance一个xml配置
canal 配置文件
  • canal.properties  (系统根配置文件)
  • instance.properties  (instance级别的配置文件,每个instance一份)
入口代码
   public static void main(String[] args) {try {logger.info("## set default uncaught exception handler");// 设置全局的异常捕获setGlobalUncaughtExceptionHandler();// 支持rocketmq client 配置日志路径System.setProperty("rocketmq.client.logUseSlf4j","true");// 加载canal 配置logger.info("## load canal configurations");String conf = System.getProperty("canal.conf", "classpath:canal.properties");Properties properties = new Properties();if (conf.startsWith(CLASSPATH_URL_PREFIX)) {conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));} else {properties.load(new FileInputStream(conf));}// 初始化启动类final CanalStarter canalStater = new CanalStarter(properties);// canal.admin.manager 配置地址String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);if (StringUtils.isNotEmpty(managerAddress)) {// 获取admin 用户名和密码String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);if (StringUtils.isEmpty(passwd)) {throw new IllegalArgumentException("canal.admin.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");}// 设置默认端口号String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");// 自否自动注册boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,CanalConstants.CANAL_ADMIN_AUTO_REGISTER));//集群地址String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);//注册名称String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);if (StringUtils.isEmpty(name)) {// 以本地机器为默认名字name = AddressUtils.getHostName();}//注册到zk 或者admin中server IP 不配置就是本地IPString registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);if (StringUtils.isEmpty(registerIp)) {registerIp = AddressUtils.getHostIp();}// 初始化配置客户端final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,user,passwd,registerIp,Integer.parseInt(adminPort),autoRegister,autoCluster,name);// 通过http方式加载远程配置 通过admin配置的信息PlainCanal canalConfig = configClient.findServer(null);if (canalConfig == null) {throw new IllegalArgumentException("managerAddress:" + managerAddress+ " can't not found config for [" + registerIp + ":" + adminPort+ "]");}Properties managerProperties = canalConfig.getProperties();// merge local  本地配置优先级更高managerProperties.putAll(properties);// auto.scan.interval  instance自动扫描的间隔时间,单位秒int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,CanalConstants.CANAL_AUTO_SCAN_INTERVAL,"5"));executor.scheduleWithFixedDelay(new Runnable() {private PlainCanal lastCanalConfig;// 定时异步线程去加载远程配置public void run() {try {if (lastCanalConfig == null) {lastCanalConfig = configClient.findServer(null);} else {//通过md5的方式验证重新验证配置是否有变更,有变更返回最新数据,并重新加载PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());if (newCanalConfig != null) {// 远程配置canal.properties修改重新加载整个应用canalStater.stop();Properties managerProperties = newCanalConfig.getProperties();// merge localmanagerProperties.putAll(properties);canalStater.setProperties(managerProperties);canalStater.start();lastCanalConfig = newCanalConfig;}}} catch (Throwable e) {logger.error("scan failed", e);}}}, 0, scanIntervalInSecond, TimeUnit.SECONDS);canalStater.setProperties(managerProperties);} else {canalStater.setProperties(properties);}canalStater.start();runningLatch.await();executor.shutdownNow();} catch (Throwable e) {logger.error("## Something goes wrong when starting up the canal Server:", e);}}

上面代码中,对于配置的加载有两部分,当配置了canal.admin.manager 后台管理地址,会先去验证配置的用户名和密码,去获取相应的server配置(通过http的方式进行获取)

    public PlainCanal findServer(String md5) {if (StringUtils.isEmpty(md5)) {md5 = "";}String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5+ "&register=" + (autoRegister ? 1 : 0) + "&cluster=" + StringUtils.stripToEmpty(autoCluster) + "&name=" + StringUtils.stripToEmpty(name);return queryConfig(url);}

当通过admin的方式进行加载配置,会每隔默认5s通过http的方式获取admin端的配置,会对整体配置进行md5值进行比较,有变更会对server进行重启。

跟随CanalStarter.start() 方法进入到启动方法,在CanalLauncher中已加载到server相关的配置。

   1.  根据配置canal.serverMode获取服务端模式,有tcp、相关的mq( kafka, rocketMQ, rabbitMQ, pulsarMQ)方式,除了tcp模式,需要初始化mq相关配置

        // 服务端模式 可通过配置进行变更 canal.serverModeString serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);if (!"tcp".equalsIgnoreCase(serverMode)) {ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);// /plugin 和 /canal/plugin"canalMQProducer = loader.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);if (canalMQProducer != null) {canalMQProducer =  new ProxyCanalMQProducer(canalMQProducer);// 初始化mq信息 由canal.serverMode 决定使用哪种mqcanalMQProducer.init(properties);}}if (canalMQProducer != null) {MQProperties mqProperties = canalMQProducer.getMqProperties();// disable nettySystem.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");if (mqProperties.isFlatMessage()) {// 设置为raw避免ByteString->Entry的二次解析System.setProperty("canal.instance.memory.rawEntry", "false");}}

2. 初始化canal调度控制器CanalController,并进行start,CanalController主要是初始化各个instance配置

3.  非tcp的方式,进行启动mq。

        if (canalMQProducer != null) {canalMQStarter = new CanalMQStarter(canalMQProducer);// 当前server上部署的instance列表String destinations = CanalController.getDestinations(properties);// 启动mqcanalMQStarter.start(destinations);controller.setCanalMQStarter(canalMQStarter);}

4. 初始化CanalAdminController:提供canal admin的管理操作

        // start canalAdminString port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);if (canalAdmin == null && StringUtils.isNotEmpty(port)) {String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);CanalAdminController canalAdmin = new CanalAdminController(this);canalAdmin.setUser(user);canalAdmin.setPasswd(passwd);String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",port,user,passwd,ip);CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();canalAdminWithNetty.setCanalAdmin(canalAdmin);canalAdminWithNetty.setPort(Integer.parseInt(port));canalAdminWithNetty.setIp(ip);canalAdminWithNetty.start();this.canalAdmin = canalAdminWithNetty;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/631322.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s---ingress对外服务(七层)

ingress 概念 k8s的对外服务&#xff0c;ingress service作用现在两个方面&#xff1a; 1、集群内部&#xff1a;不断跟踪的变化&#xff0c;更新endpoint中的pod对象&#xff0c;基于pod的ip地址不断变化的一种服务发现机制。 2、集群外部&#xff1a;类似于负载均衡器&a…

elasticsearch[二]-DSL查询语法:全文检索、精准查询(term/range)、地理坐标查询(矩阵、范围)、复合查询(相关性算法)、布尔查询

ES-DSL查询语法&#xff08;全文检索、精准查询、地理坐标查询&#xff09; 1.DSL查询文档 elasticsearch 的查询依然是基于 JSON 风格的 DSL 来实现的。 1.1.DSL 查询分类 Elasticsearch 提供了基于 JSON 的 DSL&#xff08;Domain Specific Language&#xff09;来定义查…

Python ddddocr 构建 exe 程序后运行报错:Failed Load model ... common_old.onnx

文章目录 ddddocr版本简单的 demo解决方案个人简介 ddddocr ddddocr是由sml2h3开发的专为验证码厂商进行对自家新版本验证码难易强度进行验证的一个python库&#xff0c;其由作者与kerlomz共同合作完成&#xff0c;通过大批量生成随机数据后进行深度网络训练&#xff0c;本身并…

qwen在vLLM下的长度外推简易方法

目的 在当前的版本vLLM中实现qwen的长度外推。 解决方法 在qwen的config.json中&#xff0c;增加如下内容&#xff1a; {"rope_scaling": { "type": "dynamic", "factor": 4.0} }dynamic:动态NTK factor:缩放因子&#xff0c;外推长…

虚拟环境中pip install不生效的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

R语言【base】:interactive():R语言是否在交互状态下运行?

Package base version 4.2.0 Usage interactive() Details 交互式的 R 会话是指有一个虚拟的操作手与 R 交互&#xff0c;比如 R 可以针对错误的输入提示更正&#xff0c;或者也可以询问接下来如何处理&#xff0c;或者认为这是可以的并且进行下一步。 GUI 控制台将安排在交…

宏集干货丨探索物联网HMI的端口转发和NAT功能

来源&#xff1a;宏集科技 工业物联网 宏集干货丨探索物联网HMI的端口转发和NAT功能 原文链接&#xff1a;https://mp.weixin.qq.com/s/zF2OqkiGnIME6sov55cGTQ 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; #工业自动化 #工业物联网 #HMI 前 言 端口转发和NAT功…

vue 里 props 类型为 Object 时设置 default: () => {} 返回的是 undefined 而不是 {}?

问题 今天遇到个小坑&#xff0c;就是 vue 里使用 props 传参类型为 Object 的时候设置 default: () > {} 报错&#xff0c;具体代码如下 <template><div class"pre-archive-info"><template v-if"infoData.kaimo ! null">{{ infoD…

ubuntu系统(10):使用samba共享linux主机中文件

目录 一、samba安装步骤 1、Linux主机端操作 &#xff08;1&#xff09;安装sabma &#xff08;2&#xff09;修改samba配置文件 &#xff08;3&#xff09;为user_name用户设置samba访问的密码 &#xff08;4&#xff09;重启samba服务 2、Windows端 二、使用 1、代码…

K8s(四)Pod资源——pod生命周期、重启策略、容器钩子与容器探测

目录 Pod生命周期 Pod重启策略 初始化容器 容器钩子 容器探测 启动探测 存活探测 就绪探测 参考资料 Pod 的生命周期 | Kubernetes Init 容器 | Kubernetes Pod的生命周期可以分为以下几个阶段&#xff1a; Pending&#xff08;等待&#xff09;&#xff1a;在这个…

Linux系统文件类型简介

Linux中的文件类型 在Linux系统中&#xff0c;每个文件都有一个文件类型&#xff0c;用于表示文件的种类。常见的文件类型包括: -&#xff1a; 普通文件&#xff1b; d&#xff1a; 目录文件&#xff1b; b&#xff1a; 块设备文件&#xff1b; c&#xff1a; 字符设备文件&a…

掌上单片机实验室 – 低分辨率编码器测速方式完善(24)

一、背景 本以为“掌上单片机实验室”这一主题已告一段落&#xff0c;可最近在测试一批新做的“轮式驱动单元”时&#xff0c;发现原来的测速算法存在问题。 起因是&#xff1a;由于轮式驱动单元的连线较长&#xff0c;PCB体积也小&#xff0c;导致脉冲信号有干扰&#xff0c;加…

2. FPGA的电路结构概述

文章目录 1. 引言2. FPGA的一般结构2.1 概要2.2 FPGA三部分构成间的关系&#xff1a; 3. 小结 1. 引言 结构决定原理。原理未必决定结构。理解FPGA结构&#xff0c;进而能阐明其工作原理很有必要。FPGA产品的风云变换&#xff0c;其基本结构保持相对不变。 2. FPGA的一般结构…

使用 Postman 发送 get 请求的简易教程

在API开发与测试的场景中&#xff0c;Postman 是一种普遍应用的工具&#xff0c;它极大地简化了发送和接收HTTP请求的流程。要发出GET请求&#xff0c;用户只需设定正确的参数并点击发送即可。 如何使用 Postman 发送一个GET请求 创建一个新请求并将类型设为 GET 首先&#…

HCIP-BGP选路

选路规则 华为BGP选路规则 思科BGP选路规则 第0条 下一跳是否可达&#xff0c;如果不可达则不参与选路 BGP 向IBGP对等体发布import引入的IGP路由时&#xff0c; 将下一跳属性改为自身的接口地址&#xff0c;而非IGP中的下一跳地址。 peer next-hop-invariable命令有以下作…

CAN\CANFD数据记录仪汽车电子售后神器

随着汽车工业的快速发展&#xff0c;CAN总线已成为汽车电子控制网络的标准。因此&#xff0c;对CAN总线数据的记录和分析变得尤为重要。 CAN数据记录仪在汽车电子售后领域的应用主要包括以下几个方面&#xff1a; 故障诊断和排查&#xff1a;通过实时记录总线上的数据&#xf…

TCP的三次握手,四次挥手

三次握手 第一次握手&#xff1a;客户端发送SYN报文&#xff0c;井发送seq为x序列号给服务端&#xff0c;等待服务端的确认第二次握手&#xff1a;服务端发送SYNACK报文&#xff0c;并发送seq为Y的序列号&#xff0c;在确认序列号为x1第三次握手&#xff1a;客户端发送ACK报文&…

matlab读取pwm波数据,不用timer的方法,这里可以参考。Matlab/Simulink之STM32开发-编码器测速

这里提供了一个不用timer的方法&#xff0c;可以参考&#xff1a; https://blog.csdn.net/weixin_36967309/article/details/88699830 Matlab/Simulink之STM32开发-编码器测速

gateway和base包+Jdk17和Jdk8版本切换(总结)

gateway 一. Gateway和Base包二.Jdk版本升级启动Idea的问题一. Gateway和Base包 在开发过程中,可能研发团队会自己写好很多的工具包。这里需要注意的是,不能将自己开发的base包引入到gateway中,gateway的作用主要是为了转发控制。 因为在gateway中会有很多单独的过滤器链,…

画图案例分享

案例 1 from scipy.misc import derivative from scipy.integrate import quad import matplotlib.pyplot as plt import numpy as np import pandas as pd from scipy.stats import norm import warningsplt.style.use(ggplot) np.random.seed(37) warnings.filterwarnings(i…