Flink 1.18安装 及配置 postgres12 同步到mysql5.7(Flink sql 方式)

文章目录

  • 1、参考
  • 2、flink 常见部署模式组合
  • 3、Standalone 安装
    • 3.1 单节点安装
    • 3.2 问题1
    • 3.3 修改ui 端口
    • 3.4 使用ip访问
  • 4 flink sql postgres --->mysql
    • 4.1 配置postgres 12
    • 4.2 新建用户并赋权
    • 4.3. 发布表
    • 4.4 Flink sql
    • 4.5 Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory'
    • 4.6 Caused by: java.io.StreamCorruptedException: unexpected block data
    • 4.7 FLink:Missing required options are: slot.name
    • 4.8 ERROR: relation "pg_publication" does not exist
    • 4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources

1、参考

Flink -3- 一文详解安装部署以及使用和调优(standalone 模式 | yarn 模式)

flink-cdc

2、flink 常见部署模式组合

在这里插入图片描述

3、Standalone 安装

3.1 单节点安装

flink 下载地址:https://flink.apache.org/downloads/

下载 flink 安装包:flink-1.18.1-bin-scala_2.12.tgz

安装在基础环境 192.168.1.51


cd /home/moduletar -xzf flink-1.18.1-bin-scala_2.12.tgzmv flink-1.18.1 flink

3.2 问题1

The file .flink-runtime.version.properties has not been generated correctly. You MUST run ‘mvn generate-sources’ in the flink-runtime module

解决:把jdk 升级1.8.421 就可以了

3.3 修改ui 端口

conf/flink-conf.yaml
rest.port: 8086

3.4 使用ip访问

在这里插入图片描述

4 flink sql postgres —>mysql

4.1 配置postgres 12

vi /var/lib/postgresql/data/postgresql.conf

vi /var/lib/postgresql/data/postgresql.conf# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s

重启 postgres

4.2 新建用户并赋权

先创建数据库和表:

-- 创建数据库 test_db
CREATE DATABASE test_db;-- 连接到新创建的数据库 test_db
\c test_db-- 创建 t_user 表CREATE TABLE "public"."t_user" ("id" int8 NOT NULL,"name" varchar(255),"age" int2,PRIMARY KEY ("id")
);

新建用户并且给用户权限:


-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';-- 给用户复制流权限
ALTER ROLE test1 replication;-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

4.3. 发布表


-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;-- 查询哪些表已经发布
select * from pg_publication_tables;

更改表的复制标识包含更新和删除的值:


-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';

4.4 Flink sql

Flink sql 客户端开启: ./sql-client.sh

CREATE TABLE `table_source_pg` (id BIGINT,name STRING,age INT) WITH ('connector' = 'postgres-cdc','hostname' = '192.168.1.115','port' = '5432','username' = 'test1','password' = 'xxxxxx','database-name' = 'test_db','schema-name' = 'public','table-name' = 't_user','decoding.plugin.name' = 'pgoutput','slot.name'= 'flink'
);CREATE TABLE `table_sink_mysql` (id BIGINT,name STRING,age INT,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.1.51:3306/test','username' = 'root','password' = 'xxxxxx','table-name' = 't_user_copy'
);INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`);

4.5 Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’

Flink SQL> INSERT INTO table_sink_mysql (id, name, age) (SELECT id, name, age FROM table_source_pg);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

解决: 在flink -->lib 目录下增加如下jar

-rw-r--r--. 1 root root 23715175 10月 15 19:04 flink-sql-connector-mysql-cdc-3.0.1.jar
-rw-r--r--. 1 root root 19379756 10月 15 17:01 flink-sql-connector-postgres-cdc-3.0.1.jar
-rw-r--r--. 1 root root 385471 10月 15 19:27 flink-connector-jdbc-3.2.0.jar
-rw-r--r--. 1 root root 2480823 10月 15 19:33 mysql-connector-j-8.0.32.jar

4.6 Caused by: java.io.StreamCorruptedException: unexpected block data

解决方案:在flink的flink-conf.yaml文件中添加classloader.resolve-order: parent-first 改成parent-first,重启集群即可

4.7 FLink:Missing required options are: slot.name

在这里插入图片描述

4.8 ERROR: relation “pg_publication” does not exist

这个问题在postgres 12上不存在,是在9.6中存在的

4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources

1、修改如下参数:

jobmanager.memory.process.size: 2600mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mtaskmanager.numberOfTaskSlots: 50

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882986.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习到底是怎么实现训练模型的(以医学图像分割为例

本文主要讲解的主要不是深度学习训练模型过程中的数学步骤,不是讲: 输入——前向传播——反向传播——输出,特征提取,特征融合等等过程。而是对于小白或者门外汉来说,知道模型怎么处理的,在用些什么东西&am…

推荐几个好用的配色网站

1.ColorSpace 地址:ColorSpace - Color Palettes Generator and Color Gradient Tool Color Space 是款功能强大的渐变色在线生成器,支持单色、双色,甚至三色渐变。 进入首页,输入一个颜色,点击 GENERATE&#xff08…

从一个简单的计算问题,看国内几个大语言模型推理逻辑能力

引言 首先,来看问题: 123456*987654等于多少,给出你计算的过程。 从openai推出chatgpt以来,大模型发展的很快,笔者也经常使用免费的大语言模型辅助进行文档编写和编码工作。大模型推出时间也好久了,笔者想…

autMan框架的定时推送功能学习

一、定时推送功能简介 “定时推送”位于“系统管理”目录 主要有两个使用方向: 一是定时向某人或某群发送信息。 二是定时运行某指令,就是机器人给自己发指令,让自己运行此指令。 二、定时推送设置 定时:cron表达式,…

Java 21新特性概述

Java 21于2023年9月19日发布,这是一个LTS(长期支持)版本,到此为止,目前有Java 8、Java 11、Java 17和Java 21这四个LTS版本。 Java 21此次推出了15个新特性,本节就介绍其中重要的几个特性: JEP…

Ubuntu20.04安装ROS2教程

Ubuntu20.04安装ROS2教程 ROS 2 安装指南支持的ROS 2 版本设置语言环境(Set locale)设置源(Setup Sources)设置密钥安装 ROS 2 包(Install ROS 2 packages)环境设置(Environment setup&#xff…

java--反射(reflection)

一、反射机制 Java Reflection (1)反射机制允许程序在执行期借助 Reflection API 取得任何类的内部信息(比如成员变量、构造器、成员方法等等),并能操作对象的属性及方法。反射在设计模式和框架底层都会用到。&#x…

时间序列预测(九)——门控循环单元网络(GRU)

目录 一、GRU结构 二、GRU核心思想 1、更新门(Update Gate):决定了当前时刻隐藏状态中旧状态和新候选状态的混合比例。 2、重置门(Reset Gate):用于控制前一时刻隐藏状态对当前候选隐藏状态的影响程度。…

Java项目-基于springboot框架的智慧外贸系统项目实战(附源码+文档)

作者:计算机学长阿伟 开发技术:SpringBoot、SSM、Vue、MySQL、ElementUI等,“文末源码”。 开发运行环境 开发语言:Java数据库:MySQL技术:SpringBoot、Vue、Mybaits Plus、ELementUI工具:IDEA/…

小新学习K8s第一天之K8s基础概念

目录 一、Kubernetes(K8s)概述 1.1、什么是K8s 1.2、K8s的作用 1.3、K8s的功能 二、K8s的特性 2.1、弹性伸缩 2.2、自我修复 2.3、服务发现和负载均衡 2.4、自动发布(默认滚动发布模式)和回滚 2.5、集中化配置管理和密钥…

高效改进!防止DataX从HDFS导入关系型数据库丢数据

高效改进!防止DataX从HDFS导入关系型数据库丢数据 针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题,优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片,确保了每个分片数据都得到全面读取和传输&…

Python 实现 excel 数据过滤

一、场景分析 假设有如下一份 excel 数据 shop.xlsx, 写一段 python 程序,实现对于车牌的分组数据过滤。 并以车牌为文件名,把店名输出到 车牌.txt 文件中。 比如 闽A.txt 文件内容为: 小林书店福州店1 小林书店福州店2 二、依赖安装 程序依…

TBWeb正式稳定版V3.4.0+AI+MJ绘画+免授权无后门+详细安装教程

TBWeb正式稳定版V3.4.0AIMJ绘画免授权无后门详细安装教程; 运行环境 Nginx1.22 PHP5.7 MySQL7.4 Redis7.0 Node.js(16.19.1) PM2管理器5.6 TBWeb系统是基于 NineAI 二开的可商业化 TB Web 应用(免授权,无后门&a…

【隐私计算】隐语HEU同态加密算法解读

HEU: 一个高性能的同态加密算法库,提供了多种 PHE 算法, 包括ZPaillier、FPaillier、IPCL、Damgard Jurik、DGK、OU、EC ElGamal 以及基于FPGA和GPU硬件加速版本的Paillier版本。 本文我们会基于GPU运行HEU Docker容器,编译打包GPaillier并测…

算法的学习笔记—两个链表的第一个公共结点(牛客JZ52)

😀前言 在链表问题中,寻找两个链表的第一个公共结点是一个经典问题。这个问题的本质是在两个单链表中找到它们的相交点,或者说它们开始共享相同节点的地方。本文将详细讲解这个问题的解题思路,并提供一种高效的解决方法。 &#x…

蓝牙资讯|iOS 18.1 正式版下周推送,AirPods Pro 2耳机将带来助听器功能

苹果公司宣布将在下周发布 iOS 18.1 正式版,同时确认该更新将为 AirPods Pro 2 耳机带来新增“临床级”助听器功能。在启用功能后,用户首先需要使用 AirPods 和 iPhone 进行简短的听力测试,如果检测到听力损失,系统将创建一项“个…

docker run 命令解析

docker run 命令解析 docker run 命令用于从给定的镜像启动一个新的容器。这个命令可以包含许多选项,下面是一些常用的选项: -d:后台运行容器,并返回容器ID;-i:以交互模式运行容器,通常与 -t …

【C++】string类 (模拟实现详解 下)

我们接着上一篇【C】string类 (模拟实现详解 上)-CSDN博客继续对string模拟实现。从这篇内容开始,string相关函数的实现就要声明和定义分离了。 1.reserve、push_back和append 在string.h的string类里进行函数的声明。 void reserve(size_…

JVM(HotSpot):GC之垃圾回收器的分类

文章目录 前言一、串行二、吞吐量优先三、响应时间优先四、常见垃圾回收器使用组合 前言 上一篇,我们学习了分代回收机制 它的主要内容是对JVM内存的一个划分,以及垃圾回收器工作时,区域运作顺序的一个规定。 所以,它是一个规范。…

Spring Boot论坛网站:开发、部署与管理

3系统分析 3.1可行性分析 通过对本论坛网站实行的目的初步调查和分析,提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本论坛网站采用SSM框架,JAVA作为开发语言,是…