flink cdc oceanbase(binlog模式)

接上文:一文说清flink从编码到部署上线
环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。

预研初衷:现在很多项目有国产化的要求,操作系统、数据库需要国产化,然后就想着找既能开源免费,又能很好的兼容MySQL,还能很好支持flink。然后就在信创目录找到OceanBase数据库。

flink探索:flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南,官网论坛感觉比较靠谱,然而发现按照说明引入依赖后,相关语法是不支持的。也在网上找了比较多的其它资料,中间比较坎坷,都未解决,不再赘述。最后转换思路:既然OceanBase支持MySQL binlog,那就把OceanBase当MySQL用,使用MySQL CDC是不是可以,最后问题得到解决。下面展开说明。

1.OceanBase部署

1.1 obd 部署

官方文档:oceanbase部署

注意:①这个地方最好选择obd 图形化部署,docker部署虽然简单,但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了,好多yum源不能用了。可以使用“阿里龙蜥 7.9”。

部署完,记得保存相关账号信息(供参考):

[{"component": "oceanbase-ce","access_url": "10.86.97.168:2881","user": "root","password": "pwd","connect_url": "obclient -h10.86.97.168 -P2881 -uroot -p'pwd' -Doceanbase -A"},{"component": "obproxy-ce","access_url": "10.86.97.168:2883","user": "root@proxysys","password": "Y6.B4s)pt","connect_url": "obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A \n"},{"component": "ocp-express","access_url": "10.86.97.168:8180","user": "admin","password": "DSxF-{odkdX-bmL6fjrF2{3mLL","connect_url": "http://10.86.97.168:8180"}
]

这个“ocp-express”是个监控页面,能看到集群信息,访问“http://10.86.97.168:8180”:
在这里插入图片描述

1.2 常用命令

启动:obd cluster start myoceanbase(改成具体集群名称)常用命令:
# 查看集群列表
obd cluster list
# 查看集群状态,以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群,以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群,以部署名为 obtest 为例
obd cluster destroy obtest

2.obbinlog

2.1 部署

官方文档:obbinlog部署

部署过程中,会遇到这个错误:“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”

解决方法:修改“/etc/yum.repos.d/OceanBase.repo”中,“$releasever”改为“7”。
在这里插入图片描述
解决完上面这个错误,其它地方就比较顺利了。

查看是否安装成功:

netstat -anp | grep 2983

2.2 创建租户

由于“不可以用root@sys创建binlog任务”,所以要创建租户。

1.查看所有的租户信息:
SELECT * FROM oceanbase.DBA_OB_TENANTS;2.查看resource pool:
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;3.创建“资源规格(UNIT)”
CREATE RESOURCE UNIT S1_unit_flink_testMEMORY_SIZE = '2G',MAX_CPU = 1, MIN_CPU = 1,LOG_DISK_SIZE = '6G',MAX_IOPS = 10000, MIN_IOPS = 10000, IOPS_WEIGHT=1;4.创建resource pool(仅 sys 租户的 root 用户(root@sys)可以创建资源池,其他租户不支持创建资源池)
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNIT='sys_unit_config', UNIT_NUM=1, ZONE_LIST=('zone1');5.创建租户:创建一个名为  flink_test_tenant 的租户(默认为 MySQL 模式租户),副本数为1,资源池指定为 flink_test_tenant_pool_01,Primary Zone 为 zone1,允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant  PRIMARY_ZONE='zone1', RESOURCE_POOL_LIST=('tenant_pool_flink_test') set OB_TCP_INVITED_NODES='%';6.使用新创建的租户管理员登录:
用户名:root@flink_test_tenant
密码:默认为空(有需要可以自己设置密码)7.创建用户( CREATE USER 权限较大,默认仅集群管理员和租户管理员拥有此系统权限):
CREATE USER 'test' IDENTIFIED BY 'pwd';
GRANT ALL ON *.* TO 'test';8.常用命令
其它命令,删除用户:
drop user 'test';
删除“资源规格”:
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息:
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;

2.3 创建数据库

账号:test@flink_test_tenant
密码:pwd。
使用上面账号登录oceanbase后创建数据库。

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` varchar(10) NOT NULL ,`uuid` varchar(30) DEFAULT NULL,`report_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;2024-12-25	uid20241225	1735090201740
2024-12-26	uid20241226	1735090201741

2.4 创建binlog

进入binlog server服务:

cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983

创建binlog:

CREATE BINLOG FOR TENANT `myoceanbase`.`flink_test_tenant` TO USER `root` PASSWORD `pwd` WITH CLUSTER URL `http://10.86.97.168:8080/services?Action=ObRootServiceInfo&ObCluster=myoceanbase`,REPLICATE NUM 2;

2.5 配置ODP

账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip='10.86.97.168:2983';

2.6 验证结果

obclient -h10.86.97.168 -P2883 -uroot@flink_test_tenant  -p -Doceanbase -A
默认密码为空(到输密码时直接回车就行)。SHOW MASTER STATUS;SHOW BINLOG EVENTS;

2.7 常见问题

问题描述:CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”

查看日志:“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn=3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”,提示没有binlog相关表。

日志路径:/home/ds/oblogproxy/log/logproxy.log

解决:重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json”
相关的数据表会重建。然后再执行“CREATE BINLOG”即可。

或者说:应该先创建数据库,再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表,如下:
在这里插入图片描述

3. fink CDC

3.1 核心代码

package com.zl.oceanbase;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;/*** 就当成MySQL使用就行。*/
public class OceanBaseCDCLikeMySQLExample {public static void main(String[] args) throws Exception {List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("10.86.97.168").port(2883)// oceanbase安装时obproxy-ce组件端口.databaseList("flink_test").tableList(String.join(",", SYNC_TABLES)).username("root@flink_test_tenant").password("")// 记得修改为实际密码.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample");// 如果不能正常读取mysql的binlog:①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}
}

3.2 flink web

在这里插入图片描述

3.3 控制台日志

在这里插入图片描述

3.4 完整代码

完成代码见:flink-cdc-mysql

4.扩展

本文主要基于oceanbase oblogproxy的binlog模式。
其实oblogproxy还支持CDC模式,详见官网文档:CDC模式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/66291.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++文件流 例题

问题&#xff1a; 设计一个留言类&#xff0c;实现以下的功能&#xff1a; 1) 程序第一次运行时&#xff0c;建立一个 message.txt 文本文件&#xff0c;并把用户输入的信息存入该文件&#xff1b; 2) 以后每次运行时&#xff0c;都先读取该文件的内容并显示给用户&#xff…

exoplayer的使用-7,手势优化

之前的手势,虽然勉强实现了效果,但有一些问题. 手按着,会有抖动,这样就不断触发进度亮度这些. 第一次按下,然后拖动,产生的变化太大,严重不符合预期. touch在一个方法里面,从代码的角度看,不便于维护. Gesture手势类都给我们处理好了,双击,点击,滚动这些.所以打算用这个优化…

遮挡半透明效果

1、遮挡半透明效果是什么 在游戏开发中&#xff0c;遮挡半透明效果就是物体被挡住的部分&#xff0c;也能呈现出一种半透明效果而被看到&#xff08;具体效果可以自定义&#xff09;比如 当角色在建筑物之间穿行时&#xff0c;被遮挡部分能够呈现出半透明效果而被我们看到。遮…

大模型高效推理综述

大模型高效推理综述 1 Introduction2 Preliminaries2.1 transformer架构的LLM2.2 大模型推理过程2.3 推理效率分析 3 TAXONOMY(分类)4.数据级别优化4.1输入压缩4.1.1 提示词裁剪&#xff08;prompt pruning&#xff09;4.1.2 提示词总结&#xff08;prompt summary&#xff09;…

计算机网络--UDP和TCP课后习题

【5-05】 试举例说明有些应用程序愿意采用不可靠的UDP, 而不愿意采用可靠的TCP。 解答&#xff1a; 这可能有以下几种情况。 首先&#xff0c;在互联网上传输实时数据的分组时&#xff0c;有可能会出现差错甚至丢失。如果利用 TCP 协议对这些出错或丢失的分组进行重传&…

http://noi.openjudge.cn/_3.9数据结构之C++STL_1806:词典

题目 1806:词典 总时间限制: 3000ms 内存限制: 65536kB 描述 你旅游到了一个国外的城市。那里的人们说的外国语言你不能理解。不过幸运的是&#xff0c;你有一本词典可以帮助你。 输入 首先输入一个词典&#xff0c;词典中包含不超过100000个词条&#xff0c;每个词条占据一行…

深入探索:将 Elasticsearch 与 Ruby 工具结合使用

深入探索&#xff1a;将 Elasticsearch 与 Ruby 工具结合使用 一、背景介绍 1. Elasticsearch 与 Ruby 的结合背景 在现代软件开发中&#xff0c;Elasticsearch 作为一个基于 Lucene 的搜索引擎&#xff0c;以其分布式、可扩展、实时搜索等特点而广受欢迎。Ruby&#xff0c;…

模型分割的联邦微调与专家 MOE结合

基于模型分割的联邦微调 在基于模型分割的联邦微调中,要实现模型分割且不影响大模型整体效果,可从以下方面着手: 依据功能和数据特性分割:分析模型的功能结构以及不同部分对数据的依赖程度。例如,在自然语言处理的大模型中,可将词嵌入层、语法分析层、语义理解层等按功能…

MyBatis 与 MyBatis-Plus 的区别

MyBatis 和 MyBatis-Plus 都是用于简化 Java 应用程序与数据库交互的持久层框架&#xff0c;但它们在功能、易用性和性能优化方面存在显著差异。下面将详细介绍两者之间的区别&#xff0c;并通过具体的代码示例进行对比。 概述 MyBatis&#xff1a;作为一款经典的持久层框架&a…

Go语言的基础知识

1, Go 语言介绍 Go 即 Golang,是 Google公司2009年11月正式对外公开的一门编程语言。 根据 Go 语言开发者自述&#xff0c;近10多年&#xff0c;从单机时代的C语言到现在互联网时代的Java,都没有令人满意的开发语言&#xff0c;而C往往给人的感觉是&#xff0c;花了100%的经历…

【UE5 C++课程系列笔记】20——共享指针的简单使用

目录 概念 创建共享指针示例 重设共享指针 共享指针内容转移 共享指针和共享引用的转换 判断共享指针的相等性 共享指针访问成员函数 自定义删除器 概念 共享指针&#xff08;主要以 TSharedPtr 为例&#xff09;&#xff0c;TSharedPtr 基于引用计数机制来工作&#x…

flux中的缓存

1. cache&#xff0c;onBackpressureBuffer。都是缓存。cache可以将hot流的数据缓存起来。onBackpressureBuffer也是缓存&#xff0c;但是当下游消费者的处理速度比上游生产者慢时&#xff0c;上游生产的数据会被暂时存储在缓冲区中&#xff0c;防止丢失。 2. Flux.range 默认…

Ubuntu网络连接问题(笔记本更换wifi后,虚拟机连不上网络)

1、笔记本更换wifi后&#xff0c;虚拟机的IP地址变了&#xff0c;然后就连不上网络了&#xff08;主机笔记本连接wifi正常上网&#xff09; 2、修改子网地址&#xff08;按照ubutun的ip设置子网掩码&#xff09; 3、Ubuntu已经显示网络连接正常了&#xff0c;但是就是无法上网&…

如何在 Ubuntu 22.04 上安装 Cassandra NoSQL 数据库教程

简介 本教程将向你介绍如何在 Ubuntu 22.04 上安装 Cassandra NoSQL 数据库。 Apache Cassandra 是一个分布式的 NoSQL 数据库&#xff0c;旨在处理跨多个普通服务器的大量数据&#xff0c;并提供高可用性&#xff0c;没有单点故障。Apache Cassandra 是一个高度可扩展的分布…

【开发工具】好用的进程管理工具supervisor

supervisor配置与使用 概述配置文件详解其他高级用法相关文献 概述 Supervisor是一个用Python编写的进程管理工具&#xff0c;主要用于在类Unix系统中管理和监控长时间运行的进程。以下是对它的详细介绍&#xff1a; 一、功能特点 进程监控 Supervisor可以自动启动、停止和重启…

C++编程等级认证学习计划

C编程等级认证学习计划 计划目标 在30天内系统学习并掌握C编程等级认证&#xff08;一至八级&#xff09;的知识点&#xff0c;为参加认证考试做好充分准备。 前期准备 学习资料收集 准备涵盖C编程一至八级知识点的专业教材&#xff0c;如《C Primer》等。收集相关的在线教…

Spring MVC实战指南:构建高效Web应用的架构与技巧(三)

响应数据和结果视图(7种) 返回值分类 创建web.xml&#xff08;spring、过滤器解决乱码、配置控制器dispatcherServlet、加载springmvc.xml文件、配置启动加载&#xff09;创建springmvc.xml文件 <!--配置了内容&#xff0c;启动Tomcat服务器的时候&#xff0c;就会被加载--…

Postgresql中clog与xid对应关系计算方法(速查表)

知道xid计算clog文件名 CREATE or REPLACE PROCEDURE get_clog_name(xid bigint) as $$ DECLAREpageno bigint;segno bigint; BEGIN-- 页面号&#xff1a;一个页面8K&#xff0c;一个字节8位能存4个事务的状态。pageno : xid / (8192 * 4);-- 段号&#xff1a;一个段&#xf…

oscp备考 oscp系列——Kioptix Level 1靶场 古老的 Apache Vuln

目录 前言 1. 主机发现 2. 端口扫描 3. 指纹识别 4. 目录扫描 5. 漏洞搜索和利用 前言 oscp备考&#xff0c;oscp系列——Kioptix Level 1靶场 Kioptix Level 1难度为简单靶场&#xff0c;主要考察 nmap的使用已经是否会看输出&#xff0c;以及是否会通过应用查找对应漏…

Linux下编译安装PETSc

本文记录在Linux下编译安装PETSc的流程。 零、环境 操作系统Ubuntu 22.04.4 LTSVS Code1.92.1Git2.34.1GCC11.4.0CMake3.22.1oneAPI2024.2.1 一、安装依赖 1.1 安装oneAPI 参见&#xff1a;Get the Intel oneAPI Base Toolkit , Get the Intel oneAPI HPC Toolkit 1.2 安…