CanalLauncher类是canal server端启动的入口类,跟随代码进行深入。
在开始之前,我们可以先了解下,
canal 配置方式
- ManagerCanalInstanceGenerator: 基于manager管理的配置方式,实时感知配置并进行server重启
- SpringCanalInstanceGenerator:基于本地spring xml的配置方式,对于多instance的时候,不便于扩展,一个instance一个xml配置
canal 配置文件
- canal.properties (系统根配置文件)
- instance.properties (instance级别的配置文件,每个instance一份)
入口代码
public static void main(String[] args) {try {logger.info("## set default uncaught exception handler");// 设置全局的异常捕获setGlobalUncaughtExceptionHandler();// 支持rocketmq client 配置日志路径System.setProperty("rocketmq.client.logUseSlf4j","true");// 加载canal 配置logger.info("## load canal configurations");String conf = System.getProperty("canal.conf", "classpath:canal.properties");Properties properties = new Properties();if (conf.startsWith(CLASSPATH_URL_PREFIX)) {conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));} else {properties.load(new FileInputStream(conf));}// 初始化启动类final CanalStarter canalStater = new CanalStarter(properties);// canal.admin.manager 配置地址String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);if (StringUtils.isNotEmpty(managerAddress)) {// 获取admin 用户名和密码String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);if (StringUtils.isEmpty(passwd)) {throw new IllegalArgumentException("canal.admin.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");}// 设置默认端口号String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");// 自否自动注册boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,CanalConstants.CANAL_ADMIN_AUTO_REGISTER));//集群地址String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);//注册名称String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);if (StringUtils.isEmpty(name)) {// 以本地机器为默认名字name = AddressUtils.getHostName();}//注册到zk 或者admin中server IP 不配置就是本地IPString registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);if (StringUtils.isEmpty(registerIp)) {registerIp = AddressUtils.getHostIp();}// 初始化配置客户端final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,user,passwd,registerIp,Integer.parseInt(adminPort),autoRegister,autoCluster,name);// 通过http方式加载远程配置 通过admin配置的信息PlainCanal canalConfig = configClient.findServer(null);if (canalConfig == null) {throw new IllegalArgumentException("managerAddress:" + managerAddress+ " can't not found config for [" + registerIp + ":" + adminPort+ "]");}Properties managerProperties = canalConfig.getProperties();// merge local 本地配置优先级更高managerProperties.putAll(properties);// auto.scan.interval instance自动扫描的间隔时间,单位秒int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,CanalConstants.CANAL_AUTO_SCAN_INTERVAL,"5"));executor.scheduleWithFixedDelay(new Runnable() {private PlainCanal lastCanalConfig;// 定时异步线程去加载远程配置public void run() {try {if (lastCanalConfig == null) {lastCanalConfig = configClient.findServer(null);} else {//通过md5的方式验证重新验证配置是否有变更,有变更返回最新数据,并重新加载PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());if (newCanalConfig != null) {// 远程配置canal.properties修改重新加载整个应用canalStater.stop();Properties managerProperties = newCanalConfig.getProperties();// merge localmanagerProperties.putAll(properties);canalStater.setProperties(managerProperties);canalStater.start();lastCanalConfig = newCanalConfig;}}} catch (Throwable e) {logger.error("scan failed", e);}}}, 0, scanIntervalInSecond, TimeUnit.SECONDS);canalStater.setProperties(managerProperties);} else {canalStater.setProperties(properties);}canalStater.start();runningLatch.await();executor.shutdownNow();} catch (Throwable e) {logger.error("## Something goes wrong when starting up the canal Server:", e);}}
上面代码中,对于配置的加载有两部分,当配置了canal.admin.manager 后台管理地址,会先去验证配置的用户名和密码,去获取相应的server配置(通过http的方式进行获取)
public PlainCanal findServer(String md5) {if (StringUtils.isEmpty(md5)) {md5 = "";}String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5+ "®ister=" + (autoRegister ? 1 : 0) + "&cluster=" + StringUtils.stripToEmpty(autoCluster) + "&name=" + StringUtils.stripToEmpty(name);return queryConfig(url);}
当通过admin的方式进行加载配置,会每隔默认5s通过http的方式获取admin端的配置,会对整体配置进行md5值进行比较,有变更会对server进行重启。
跟随CanalStarter.start() 方法进入到启动方法,在CanalLauncher中已加载到server相关的配置。
1. 根据配置canal.serverMode获取服务端模式,有tcp、相关的mq( kafka, rocketMQ, rabbitMQ, pulsarMQ)方式,除了tcp模式,需要初始化mq相关配置
// 服务端模式 可通过配置进行变更 canal.serverModeString serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);if (!"tcp".equalsIgnoreCase(serverMode)) {ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);// /plugin 和 /canal/plugin"canalMQProducer = loader.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);if (canalMQProducer != null) {canalMQProducer = new ProxyCanalMQProducer(canalMQProducer);// 初始化mq信息 由canal.serverMode 决定使用哪种mqcanalMQProducer.init(properties);}}if (canalMQProducer != null) {MQProperties mqProperties = canalMQProducer.getMqProperties();// disable nettySystem.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");if (mqProperties.isFlatMessage()) {// 设置为raw避免ByteString->Entry的二次解析System.setProperty("canal.instance.memory.rawEntry", "false");}}
2. 初始化canal调度控制器CanalController,并进行start,CanalController主要是初始化各个instance配置
3. 非tcp的方式,进行启动mq。
if (canalMQProducer != null) {canalMQStarter = new CanalMQStarter(canalMQProducer);// 当前server上部署的instance列表String destinations = CanalController.getDestinations(properties);// 启动mqcanalMQStarter.start(destinations);controller.setCanalMQStarter(canalMQStarter);}
4. 初始化CanalAdminController:提供canal admin的管理操作
// start canalAdminString port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);if (canalAdmin == null && StringUtils.isNotEmpty(port)) {String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);CanalAdminController canalAdmin = new CanalAdminController(this);canalAdmin.setUser(user);canalAdmin.setPasswd(passwd);String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",port,user,passwd,ip);CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();canalAdminWithNetty.setCanalAdmin(canalAdmin);canalAdminWithNetty.setPort(Integer.parseInt(port));canalAdminWithNetty.setIp(ip);canalAdminWithNetty.start();this.canalAdmin = canalAdminWithNetty;}