文章目录
- 1、参考
- 2、flink 常见部署模式组合
- 3、Standalone 安装
- 3.1 单节点安装
- 3.2 问题1
- 3.3 修改ui 端口
- 3.4 使用ip访问
- 4 flink sql postgres --->mysql
- 4.1 配置postgres 12
- 4.2 新建用户并赋权
- 4.3. 发布表
- 4.4 Flink sql
- 4.5 Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory'
- 4.6 Caused by: java.io.StreamCorruptedException: unexpected block data
- 4.7 FLink:Missing required options are: slot.name
- 4.8 ERROR: relation "pg_publication" does not exist
- 4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources
1、参考
Flink -3- 一文详解安装部署以及使用和调优(standalone 模式 | yarn 模式)
flink-cdc
2、flink 常见部署模式组合
3、Standalone 安装
3.1 单节点安装
flink 下载地址:https://flink.apache.org/downloads/
下载 flink 安装包:flink-1.18.1-bin-scala_2.12.tgz
安装在基础环境 192.168.1.51
cd /home/moduletar -xzf flink-1.18.1-bin-scala_2.12.tgzmv flink-1.18.1 flink
3.2 问题1
The file .flink-runtime.version.properties has not been generated correctly. You MUST run ‘mvn generate-sources’ in the flink-runtime module
解决:把jdk 升级1.8.421 就可以了
3.3 修改ui 端口
conf/flink-conf.yaml
rest.port: 8086
3.4 使用ip访问
4 flink sql postgres —>mysql
4.1 配置postgres 12
vi /var/lib/postgresql/data/postgresql.conf
vi /var/lib/postgresql/data/postgresql.conf# 更改wal日志方式为logical(方式有:minimal、replica 、logical )
wal_level = logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s
重启 postgres
4.2 新建用户并赋权
先创建数据库和表:
-- 创建数据库 test_db
CREATE DATABASE test_db;-- 连接到新创建的数据库 test_db
\c test_db-- 创建 t_user 表CREATE TABLE "public"."t_user" ("id" int8 NOT NULL,"name" varchar(255),"age" int2,PRIMARY KEY ("id")
);
新建用户并且给用户权限:
-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';-- 给用户复制流权限
ALTER ROLE test1 replication;-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;
4.3. 发布表
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;-- 查询哪些表已经发布
select * from pg_publication_tables;
更改表的复制标识包含更新和删除的值:
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';
4.4 Flink sql
Flink sql 客户端开启: ./sql-client.sh
CREATE TABLE `table_source_pg` (id BIGINT,name STRING,age INT) WITH ('connector' = 'postgres-cdc','hostname' = '192.168.1.115','port' = '5432','username' = 'test1','password' = 'xxxxxx','database-name' = 'test_db','schema-name' = 'public','table-name' = 't_user','decoding.plugin.name' = 'pgoutput','slot.name'= 'flink'
);CREATE TABLE `table_sink_mysql` (id BIGINT,name STRING,age INT,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.1.51:3306/test','username' = 'root','password' = 'xxxxxx','table-name' = 't_user_copy'
);INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`);
4.5 Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’
Flink SQL> INSERT INTO table_sink_mysql
(id
, name
, age
) (SELECT id
, name
, age
FROM table_source_pg
);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.
解决: 在flink -->lib 目录下增加如下jar
-rw-r--r--. 1 root root 23715175 10月 15 19:04 flink-sql-connector-mysql-cdc-3.0.1.jar
-rw-r--r--. 1 root root 19379756 10月 15 17:01 flink-sql-connector-postgres-cdc-3.0.1.jar
-rw-r--r--. 1 root root 385471 10月 15 19:27 flink-connector-jdbc-3.2.0.jar
-rw-r--r--. 1 root root 2480823 10月 15 19:33 mysql-connector-j-8.0.32.jar
4.6 Caused by: java.io.StreamCorruptedException: unexpected block data
解决方案:在flink的flink-conf.yaml文件中添加classloader.resolve-order: parent-first 改成parent-first,重启集群即可
4.7 FLink:Missing required options are: slot.name
4.8 ERROR: relation “pg_publication” does not exist
这个问题在postgres 12上不存在,是在9.6中存在的
4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources
1、修改如下参数:
jobmanager.memory.process.size: 2600mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mtaskmanager.numberOfTaskSlots: 50