我这用的是 社区版的 单机, rocky liunx 安装oceanbase
注意事项:
logproxy 是 CDC 模式 , springboot 可以直接订阅
canal 是 binlog模式, canal 订阅 logproxy, springboot 订阅 canal
logproxy 也可以转 binlog
大数据量跟高频率推荐 binlog
1: oceanbase 下载地址:
OceanBase分布式数据库-海量数据 笔笔算数
2: 安装 obd web 界面安装
文档地址:
OceanBase分布式数据库-海量数据 笔笔算数
tar -xzf oceanbase-all-in-one-*.tar.gz
cd oceanbase-all-in-one/bin/
./install.sh
source ~/.oceanbase-all-in-one/bin/env.sh
obd web
obd 集群命令
https://www.oceanbase.com/docs/community-obd-cn-10000000001690699# 查看集群列表
obd cluster list# 查看集群状态,以部署名为 demo为例
obd cluster display demo# 停止运行中的集群,以部署名为 demo为例
obd cluster stop demo# 销毁已部署的集群,以部署名为 demo 为例
obd cluster destroy demo启动 ob demo = 名称
obd cluster restart demo
3: 下载 logproxy
Releases · oceanbase/oblogproxy · GitHub
4: 安装 logproxy
文档地址OceanBase分布式数据库-海量数据 笔笔算数
下载完成后,通过如下命令进行安装:
rpm -i oblogproxy-{version}.{arch}.rpm
项目安装默认为 /usr/local/oblogproxy
。
5: 配置和启动 logproxy
注意这里配置的用户, sys下新建租户, 有 且必须具备 sys 租户下 OceanBase 数据库的读权限。就是sys下新建租户, 授权OceanBase 的读权限给这个新建的租户
先配置执行 sys 新建的 用户名密码
sh run.sh config_sys sys passwd
cd /usr/local/oblogproxy/
./run.sh start / stop
6: 代码实现:
OceanBase分布式数据库-海量数据 笔笔算数
<dependency>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logproxy-client</artifactId>
<version>1.1.0</version>
</dependency>
ObReaderConfig config = new ObReaderConfig();
// 设置OceanBase root server 地址列表,格式为(可以支持多个,用';'分隔):ip1:rpc_port1:sql_port1;ip2:rpc_port2:sql_port2
config.setRsList("xxx.xxx.xxx.1:2882:2881;xxx.xxx.xxx.2:2882:2881");
// 设置用户名和密码(非系统租户)
config.setUsername("r***"); 这里可以直接用数据库的租户和密码
config.setPassword("****");
// 设置启动位点(UNIX时间戳,单位s), 0表示从当前时间启动。
config.setStartTimestamp(0L);
// 设置订阅表白名单,格式为:tenant.db.table, '*'表示通配.
config.setTableWhiteList("sys.*.*"); sys是租户,一般会自己新建一个租户存放数据库如 demo.*.*// 指定oblogproxy服务地址,创建实例.
LogProxyClient client = new LogProxyClient("xxx.xxx.xxx.1", 2983, config);
// 添加 RecordListener
client.addListener(new RecordListener() {@Overridepublic void notify(LogMessage message){// 处理消息if(StringUtils.isBlank(message.getTableName())){return;}if(!tableName.contains(message.getTableName())){return;}// 处理消息switch (message.getOpt()) {case INSERT:insertSql(message.getFieldList(), message.getTableName());break;case UPDATE:updateSql(message.getFieldList(), message.getTableName());break;case DELETE:deleteSql(message.getFieldList(), message.getTableName());break;default:break;}}@Overridepublic void onException(LogProxyClientException e) {// 处理错误if (e.needStop()) {// 不可恢复异常,需要停止Clientclient.stop();}}
});// 启动
client.start();
client.join();//新增的SQLprivate void insertSql(List<DataMessage.Record.Field> list, String tableName) {if (CollectionUtils.isEmpty(list)) {return;}list 就是行数据, 可以查看对象看看具体的信息getFieldname() 字段名getValue().toString(CommonConstant.UTF8MB4) 字段值}
7: 程序启动后 可以观察 程序的打印:
fail auth 就是 logproxy 的 sh run.sh config_sys sys passwd 这个命令配置的 用户名密码不对
如下表示连接和 订阅成功
logproxy 日志打印:
cd /usr/local/oblogproxy/
tail -f log/logproxy.log
只要
1: sh run.sh config_sys sys passwd 这里是 sys下新建的租户 用户名和密码正确
2:config.setUsername("r***"); 这里可以直接用数据库的租户和密码
config.setPassword("****");
就没问题了
sys 系统租户,
ocp 管理用的
demo 自己新建的, 用来创建数据库 的 , (新建数据库,用户名,密码)
config.setUsername("r***") 就是这个数据库的用户名和密码