canal server初始化源码分析

CanalLauncher类是canal server端启动的入口类,跟随代码进行深入。

在开始之前,我们可以先了解下,

canal 配置方式
  1. ManagerCanalInstanceGenerator: 基于manager管理的配置方式,实时感知配置并进行server重启
  2. SpringCanalInstanceGenerator:基于本地spring xml的配置方式,对于多instance的时候,不便于扩展,一个instance一个xml配置
canal 配置文件
  • canal.properties  (系统根配置文件)
  • instance.properties  (instance级别的配置文件,每个instance一份)
入口代码
   public static void main(String[] args) {try {logger.info("## set default uncaught exception handler");// 设置全局的异常捕获setGlobalUncaughtExceptionHandler();// 支持rocketmq client 配置日志路径System.setProperty("rocketmq.client.logUseSlf4j","true");// 加载canal 配置logger.info("## load canal configurations");String conf = System.getProperty("canal.conf", "classpath:canal.properties");Properties properties = new Properties();if (conf.startsWith(CLASSPATH_URL_PREFIX)) {conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));} else {properties.load(new FileInputStream(conf));}// 初始化启动类final CanalStarter canalStater = new CanalStarter(properties);// canal.admin.manager 配置地址String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);if (StringUtils.isNotEmpty(managerAddress)) {// 获取admin 用户名和密码String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);if (StringUtils.isEmpty(passwd)) {throw new IllegalArgumentException("canal.admin.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");}// 设置默认端口号String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");// 自否自动注册boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,CanalConstants.CANAL_ADMIN_AUTO_REGISTER));//集群地址String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);//注册名称String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);if (StringUtils.isEmpty(name)) {// 以本地机器为默认名字name = AddressUtils.getHostName();}//注册到zk 或者admin中server IP 不配置就是本地IPString registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);if (StringUtils.isEmpty(registerIp)) {registerIp = AddressUtils.getHostIp();}// 初始化配置客户端final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,user,passwd,registerIp,Integer.parseInt(adminPort),autoRegister,autoCluster,name);// 通过http方式加载远程配置 通过admin配置的信息PlainCanal canalConfig = configClient.findServer(null);if (canalConfig == null) {throw new IllegalArgumentException("managerAddress:" + managerAddress+ " can't not found config for [" + registerIp + ":" + adminPort+ "]");}Properties managerProperties = canalConfig.getProperties();// merge local  本地配置优先级更高managerProperties.putAll(properties);// auto.scan.interval  instance自动扫描的间隔时间,单位秒int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,CanalConstants.CANAL_AUTO_SCAN_INTERVAL,"5"));executor.scheduleWithFixedDelay(new Runnable() {private PlainCanal lastCanalConfig;// 定时异步线程去加载远程配置public void run() {try {if (lastCanalConfig == null) {lastCanalConfig = configClient.findServer(null);} else {//通过md5的方式验证重新验证配置是否有变更,有变更返回最新数据,并重新加载PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());if (newCanalConfig != null) {// 远程配置canal.properties修改重新加载整个应用canalStater.stop();Properties managerProperties = newCanalConfig.getProperties();// merge localmanagerProperties.putAll(properties);canalStater.setProperties(managerProperties);canalStater.start();lastCanalConfig = newCanalConfig;}}} catch (Throwable e) {logger.error("scan failed", e);}}}, 0, scanIntervalInSecond, TimeUnit.SECONDS);canalStater.setProperties(managerProperties);} else {canalStater.setProperties(properties);}canalStater.start();runningLatch.await();executor.shutdownNow();} catch (Throwable e) {logger.error("## Something goes wrong when starting up the canal Server:", e);}}

上面代码中,对于配置的加载有两部分,当配置了canal.admin.manager 后台管理地址,会先去验证配置的用户名和密码,去获取相应的server配置(通过http的方式进行获取)

    public PlainCanal findServer(String md5) {if (StringUtils.isEmpty(md5)) {md5 = "";}String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5+ "&register=" + (autoRegister ? 1 : 0) + "&cluster=" + StringUtils.stripToEmpty(autoCluster) + "&name=" + StringUtils.stripToEmpty(name);return queryConfig(url);}

当通过admin的方式进行加载配置,会每隔默认5s通过http的方式获取admin端的配置,会对整体配置进行md5值进行比较,有变更会对server进行重启。

跟随CanalStarter.start() 方法进入到启动方法,在CanalLauncher中已加载到server相关的配置。

   1.  根据配置canal.serverMode获取服务端模式,有tcp、相关的mq( kafka, rocketMQ, rabbitMQ, pulsarMQ)方式,除了tcp模式,需要初始化mq相关配置

        // 服务端模式 可通过配置进行变更 canal.serverModeString serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);if (!"tcp".equalsIgnoreCase(serverMode)) {ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);// /plugin 和 /canal/plugin"canalMQProducer = loader.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);if (canalMQProducer != null) {canalMQProducer =  new ProxyCanalMQProducer(canalMQProducer);// 初始化mq信息 由canal.serverMode 决定使用哪种mqcanalMQProducer.init(properties);}}if (canalMQProducer != null) {MQProperties mqProperties = canalMQProducer.getMqProperties();// disable nettySystem.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");if (mqProperties.isFlatMessage()) {// 设置为raw避免ByteString->Entry的二次解析System.setProperty("canal.instance.memory.rawEntry", "false");}}

2. 初始化canal调度控制器CanalController,并进行start,CanalController主要是初始化各个instance配置

3.  非tcp的方式,进行启动mq。

        if (canalMQProducer != null) {canalMQStarter = new CanalMQStarter(canalMQProducer);// 当前server上部署的instance列表String destinations = CanalController.getDestinations(properties);// 启动mqcanalMQStarter.start(destinations);controller.setCanalMQStarter(canalMQStarter);}

4. 初始化CanalAdminController:提供canal admin的管理操作

        // start canalAdminString port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);if (canalAdmin == null && StringUtils.isNotEmpty(port)) {String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);CanalAdminController canalAdmin = new CanalAdminController(this);canalAdmin.setUser(user);canalAdmin.setPasswd(passwd);String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",port,user,passwd,ip);CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();canalAdminWithNetty.setCanalAdmin(canalAdmin);canalAdminWithNetty.setPort(Integer.parseInt(port));canalAdminWithNetty.setIp(ip);canalAdminWithNetty.start();this.canalAdmin = canalAdminWithNetty;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/631322.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s---ingress对外服务(七层)

ingress 概念 k8s的对外服务&#xff0c;ingress service作用现在两个方面&#xff1a; 1、集群内部&#xff1a;不断跟踪的变化&#xff0c;更新endpoint中的pod对象&#xff0c;基于pod的ip地址不断变化的一种服务发现机制。 2、集群外部&#xff1a;类似于负载均衡器&a…

elasticsearch[二]-DSL查询语法:全文检索、精准查询(term/range)、地理坐标查询(矩阵、范围)、复合查询(相关性算法)、布尔查询

ES-DSL查询语法&#xff08;全文检索、精准查询、地理坐标查询&#xff09; 1.DSL查询文档 elasticsearch 的查询依然是基于 JSON 风格的 DSL 来实现的。 1.1.DSL 查询分类 Elasticsearch 提供了基于 JSON 的 DSL&#xff08;Domain Specific Language&#xff09;来定义查…

虚拟环境中pip install不生效的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

宏集干货丨探索物联网HMI的端口转发和NAT功能

来源&#xff1a;宏集科技 工业物联网 宏集干货丨探索物联网HMI的端口转发和NAT功能 原文链接&#xff1a;https://mp.weixin.qq.com/s/zF2OqkiGnIME6sov55cGTQ 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; #工业自动化 #工业物联网 #HMI 前 言 端口转发和NAT功…

vue 里 props 类型为 Object 时设置 default: () => {} 返回的是 undefined 而不是 {}?

问题 今天遇到个小坑&#xff0c;就是 vue 里使用 props 传参类型为 Object 的时候设置 default: () > {} 报错&#xff0c;具体代码如下 <template><div class"pre-archive-info"><template v-if"infoData.kaimo ! null">{{ infoD…

ubuntu系统(10):使用samba共享linux主机中文件

目录 一、samba安装步骤 1、Linux主机端操作 &#xff08;1&#xff09;安装sabma &#xff08;2&#xff09;修改samba配置文件 &#xff08;3&#xff09;为user_name用户设置samba访问的密码 &#xff08;4&#xff09;重启samba服务 2、Windows端 二、使用 1、代码…

Linux系统文件类型简介

Linux中的文件类型 在Linux系统中&#xff0c;每个文件都有一个文件类型&#xff0c;用于表示文件的种类。常见的文件类型包括: -&#xff1a; 普通文件&#xff1b; d&#xff1a; 目录文件&#xff1b; b&#xff1a; 块设备文件&#xff1b; c&#xff1a; 字符设备文件&a…

掌上单片机实验室 – 低分辨率编码器测速方式完善(24)

一、背景 本以为“掌上单片机实验室”这一主题已告一段落&#xff0c;可最近在测试一批新做的“轮式驱动单元”时&#xff0c;发现原来的测速算法存在问题。 起因是&#xff1a;由于轮式驱动单元的连线较长&#xff0c;PCB体积也小&#xff0c;导致脉冲信号有干扰&#xff0c;加…

使用 Postman 发送 get 请求的简易教程

在API开发与测试的场景中&#xff0c;Postman 是一种普遍应用的工具&#xff0c;它极大地简化了发送和接收HTTP请求的流程。要发出GET请求&#xff0c;用户只需设定正确的参数并点击发送即可。 如何使用 Postman 发送一个GET请求 创建一个新请求并将类型设为 GET 首先&#…

CAN\CANFD数据记录仪汽车电子售后神器

随着汽车工业的快速发展&#xff0c;CAN总线已成为汽车电子控制网络的标准。因此&#xff0c;对CAN总线数据的记录和分析变得尤为重要。 CAN数据记录仪在汽车电子售后领域的应用主要包括以下几个方面&#xff1a; 故障诊断和排查&#xff1a;通过实时记录总线上的数据&#xf…

TCP的三次握手,四次挥手

三次握手 第一次握手&#xff1a;客户端发送SYN报文&#xff0c;井发送seq为x序列号给服务端&#xff0c;等待服务端的确认第二次握手&#xff1a;服务端发送SYNACK报文&#xff0c;并发送seq为Y的序列号&#xff0c;在确认序列号为x1第三次握手&#xff1a;客户端发送ACK报文&…

画图案例分享

案例 1 from scipy.misc import derivative from scipy.integrate import quad import matplotlib.pyplot as plt import numpy as np import pandas as pd from scipy.stats import norm import warningsplt.style.use(ggplot) np.random.seed(37) warnings.filterwarnings(i…

VMware workstation安装SUSE Linux Enterprise Server 12 SP5虚拟机并配置网络

VMware workstation安装SUSE Linux Enterprise Server 12 SP5虚拟机并配置网络 SUSE Linux Enterprise Server是企业级Linux系统&#xff0c;适合企业应用。该文档适用于在VMware workstation平台安装SUSE Linux Enterprise Server虚拟机。 1.安装准备 1.1安装平台 Windows…

如何用GPT进行论文润色与改写?

详情点击链接&#xff1a;如何用GPT/GPT4进行论文润色与改写&#xff1f;一OpenAI 1.最新大模型GPT-4 Turbo 2.最新发布的高级数据分析&#xff0c;AI画图&#xff0c;图像识别&#xff0c;文档API 3.GPT Store 4.从0到1创建自己的GPT应用 5. 模型Gemini以及大模型Claude2二…

前端(html+css+javascript)作业--展现家乡的网页

期末期间&#xff0c;老师布置了前端作业&#xff0c;现在放到这里&#xff0c;给各位同志参考。 桂平市是广西壮族自治区的一个美丽的城市&#xff0c;拥有丰富的历史文化和自然景观&#xff0c;属于贵港市管辖&#xff0c;那为什么是看起来是市级而不是县级&#xff0c;其实他…

史上最全在IDEA中部署并使用Tomcat,图文并茂一看包会!

前言 之前自己迷茫过怎样在idea中使用Tomcat&#xff0c;因此查了很多资料&#xff0c;在这做个总结。 一、建立过程 1.新建一个JAVA文件 2.添加框架 在项目的创建区内&#xff0c;右击项目名&#xff0c;在弹窗中选择“添加框架支持”&#xff1b;选择“Web应用程序4.0”&…

Eclipse闪退 打开eclipse闪退 打开eclipse图标一闪而过 eclipse闪退 eclipse打不开

Eclipse闪退 打开eclipse闪退 打开eclipse图标一闪而过 eclipse闪退 eclipse打不开 问题描述切换为命令行启动 查看异常日志 问题描述 双击图标&#xff0c;窗口一闪而过&#xff0c;马上关闭了 切换为命令行启动 查看异常日志 进入Eclipse安装目录&#xff0c;运行终端启动…

指向未来: 量子纠缠的本质是一个指针

指向未来: 量子纠缠的本质是一个指针 概述基本概念理解量子纠缠PythonJavaC 理解波粒二象性PythonJavaC 理解量子隧穿理解宇宙常量PythonJavaC 概述 量子纠缠 (Quantum Entanglement) 是量子系统重两个或多个粒子间的一种特殊连接, 这种连接使得即使相隔很远, 这些粒子的状态也…

1. SpringBoot3 基础

文章目录 1. SpringBoot 概述2. SpringBoot 入门3. SpringBoot 配置文件3.1 SpringBoot 配置文件基本使用3.2 yml 配置文件 4. SpringBoot 整合 Mybatis5. Bean 管理5.1 Bean 扫描5.2 Bean 注册5.3 注册条件 6. 组合注解7. 自动配置原理8. 自定义 Starter 1. SpringBoot 概述 …

力扣hot100 完全平方数 完全背包 滚动数组 四平方和定理

Problem: 279. 完全平方数 文章目录 思路&#x1f496; 完全背包&#x1f496; 滚动数组优化&#x1f496; 四平方和定理 思路 &#x1f468;‍&#x1f3eb; 三叶神解 &#x1f468;‍&#x1f3eb; 数学解法 &#x1f496; 完全背包 ⏰ 时间复杂度: O ( n 2 n ) O(n^2 …