Flink 1.18安装 及配置 postgres12 同步到mysql5.7(Flink sql 方式)

文章目录

  • 1、参考
  • 2、flink 常见部署模式组合
  • 3、Standalone 安装
    • 3.1 单节点安装
    • 3.2 问题1
    • 3.3 修改ui 端口
    • 3.4 使用ip访问
  • 4 flink sql postgres --->mysql
    • 4.1 配置postgres 12
    • 4.2 新建用户并赋权
    • 4.3. 发布表
    • 4.4 Flink sql
    • 4.5 Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory'
    • 4.6 Caused by: java.io.StreamCorruptedException: unexpected block data
    • 4.7 FLink:Missing required options are: slot.name
    • 4.8 ERROR: relation "pg_publication" does not exist
    • 4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources

1、参考

Flink -3- 一文详解安装部署以及使用和调优(standalone 模式 | yarn 模式)

flink-cdc

2、flink 常见部署模式组合

在这里插入图片描述

3、Standalone 安装

3.1 单节点安装

flink 下载地址:https://flink.apache.org/downloads/

下载 flink 安装包:flink-1.18.1-bin-scala_2.12.tgz

安装在基础环境 192.168.1.51


cd /home/moduletar -xzf flink-1.18.1-bin-scala_2.12.tgzmv flink-1.18.1 flink

3.2 问题1

The file .flink-runtime.version.properties has not been generated correctly. You MUST run ‘mvn generate-sources’ in the flink-runtime module

解决:把jdk 升级1.8.421 就可以了

3.3 修改ui 端口

conf/flink-conf.yaml
rest.port: 8086

3.4 使用ip访问

在这里插入图片描述

4 flink sql postgres —>mysql

4.1 配置postgres 12

vi /var/lib/postgresql/data/postgresql.conf

vi /var/lib/postgresql/data/postgresql.conf# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s

重启 postgres

4.2 新建用户并赋权

先创建数据库和表:

-- 创建数据库 test_db
CREATE DATABASE test_db;-- 连接到新创建的数据库 test_db
\c test_db-- 创建 t_user 表CREATE TABLE "public"."t_user" ("id" int8 NOT NULL,"name" varchar(255),"age" int2,PRIMARY KEY ("id")
);

新建用户并且给用户权限:


-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';-- 给用户复制流权限
ALTER ROLE test1 replication;-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

4.3. 发布表


-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;-- 查询哪些表已经发布
select * from pg_publication_tables;

更改表的复制标识包含更新和删除的值:


-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';

4.4 Flink sql

Flink sql 客户端开启: ./sql-client.sh

CREATE TABLE `table_source_pg` (id BIGINT,name STRING,age INT) WITH ('connector' = 'postgres-cdc','hostname' = '192.168.1.115','port' = '5432','username' = 'test1','password' = 'xxxxxx','database-name' = 'test_db','schema-name' = 'public','table-name' = 't_user','decoding.plugin.name' = 'pgoutput','slot.name'= 'flink'
);CREATE TABLE `table_sink_mysql` (id BIGINT,name STRING,age INT,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.1.51:3306/test','username' = 'root','password' = 'xxxxxx','table-name' = 't_user_copy'
);INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`);

4.5 Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’

Flink SQL> INSERT INTO table_sink_mysql (id, name, age) (SELECT id, name, age FROM table_source_pg);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

解决: 在flink -->lib 目录下增加如下jar

-rw-r--r--. 1 root root 23715175 10月 15 19:04 flink-sql-connector-mysql-cdc-3.0.1.jar
-rw-r--r--. 1 root root 19379756 10月 15 17:01 flink-sql-connector-postgres-cdc-3.0.1.jar
-rw-r--r--. 1 root root 385471 10月 15 19:27 flink-connector-jdbc-3.2.0.jar
-rw-r--r--. 1 root root 2480823 10月 15 19:33 mysql-connector-j-8.0.32.jar

4.6 Caused by: java.io.StreamCorruptedException: unexpected block data

解决方案:在flink的flink-conf.yaml文件中添加classloader.resolve-order: parent-first 改成parent-first,重启集群即可

4.7 FLink:Missing required options are: slot.name

在这里插入图片描述

4.8 ERROR: relation “pg_publication” does not exist

这个问题在postgres 12上不存在,是在9.6中存在的

4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources

1、修改如下参数:

jobmanager.memory.process.size: 2600mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mtaskmanager.numberOfTaskSlots: 50

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882986.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习到底是怎么实现训练模型的(以医学图像分割为例

本文主要讲解的主要不是深度学习训练模型过程中的数学步骤,不是讲: 输入——前向传播——反向传播——输出,特征提取,特征融合等等过程。而是对于小白或者门外汉来说,知道模型怎么处理的,在用些什么东西&am…

分享二维码弹窗,实现扫码跳转

1.二维码组件 QRCode CustomDialog export struct ShareDialog {Prop item: QuestionDetail as QuestionDetailcontroller: CustomDialogControllerQRCode(this.item.id).width(160).height(160) } 2.扫码跳转 前置知识: Scan Kit 提供默认界面扫码能力。canIUs…

推荐几个好用的配色网站

1.ColorSpace 地址:ColorSpace - Color Palettes Generator and Color Gradient Tool Color Space 是款功能强大的渐变色在线生成器,支持单色、双色,甚至三色渐变。 进入首页,输入一个颜色,点击 GENERATE&#xff08…

从一个简单的计算问题,看国内几个大语言模型推理逻辑能力

引言 首先,来看问题: 123456*987654等于多少,给出你计算的过程。 从openai推出chatgpt以来,大模型发展的很快,笔者也经常使用免费的大语言模型辅助进行文档编写和编码工作。大模型推出时间也好久了,笔者想…

autMan框架的定时推送功能学习

一、定时推送功能简介 “定时推送”位于“系统管理”目录 主要有两个使用方向: 一是定时向某人或某群发送信息。 二是定时运行某指令,就是机器人给自己发指令,让自己运行此指令。 二、定时推送设置 定时:cron表达式,…

常用分布的数学期望、方差、特征函数

文章目录 相关教程相关文献常用分布的数学期望&方差&特征函数定义事件域概率条件概率随机变量分布函数连续随机变量的概率密度函数数学期望离散随机变量连续随机变量 方差与标准差最大似然估计特征函数 不等式Chebyshev(切比雪夫)不等式 作者&am…

Java 21新特性概述

Java 21于2023年9月19日发布,这是一个LTS(长期支持)版本,到此为止,目前有Java 8、Java 11、Java 17和Java 21这四个LTS版本。 Java 21此次推出了15个新特性,本节就介绍其中重要的几个特性: JEP…

C# 读取本地网络配置信息

C# 读取本地网络配置信息 应用场景示例示例1: 获取所有网络接口的信息示例2: 获取特定网络接口的IP配置信息示例3: 获取DNS服务器地址 在现代软件开发中,处理网络配置信息是一个常见需求。无论是开发桌面、移动还是服务器应用程序,了解如何在C#中读取和管…

Ubuntu20.04安装ROS2教程

Ubuntu20.04安装ROS2教程 ROS 2 安装指南支持的ROS 2 版本设置语言环境(Set locale)设置源(Setup Sources)设置密钥安装 ROS 2 包(Install ROS 2 packages)环境设置(Environment setup&#xff…

自闭症学校:儿童成长的崭新希望

在社会的一隅,存在着这样一群特殊的孩子,他们仿佛沉浸于独属于自己的世界,难以与外界进行正常的交流互动,他们便是自闭症儿童。对于这些孩子及其家庭而言,自闭症学校无疑成为了助力他们成长的全新希望。 自闭症学校为孩…

详细且系统的Spring Boot应用开发

为了帮助大家更好地理解如何使用Spring Boot来构建一个基础的Web应用程序,我将通过一个简单的例子来进行说明。这个例子将是一个基本的RESTful API服务,用于管理图书信息。 项目准备 1. 开发环境设置(这个我之前有发文,这里就不详…

java--反射(reflection)

一、反射机制 Java Reflection (1)反射机制允许程序在执行期借助 Reflection API 取得任何类的内部信息(比如成员变量、构造器、成员方法等等),并能操作对象的属性及方法。反射在设计模式和框架底层都会用到。&#x…

[Linux Codec驱动]音频路由概念

1. 音频路由的基本概念 源(Source):音频信号的发出方,通常是一个音频输入设备,如麦克风、音频播放设备等。接收端(Sink):音频信号的接收方,通常是音频输出设备&#xff…

时间序列预测(九)——门控循环单元网络(GRU)

目录 一、GRU结构 二、GRU核心思想 1、更新门(Update Gate):决定了当前时刻隐藏状态中旧状态和新候选状态的混合比例。 2、重置门(Reset Gate):用于控制前一时刻隐藏状态对当前候选隐藏状态的影响程度。…

爬虫爬取数据时,如何解决由于验证码通常是动态生成的,直接通过URL下载可能会遇到验证码内容不一致的问题?( ̄︶ ̄)↗

在使用Selenium下载图片验证码时,由于验证码通常是动态生成的,直接通过URL下载可能会遇到验证码内容不一致的问题。因此,更可靠的方法是使用Selenium的截图功能,然后裁剪出验证码部分。 再通过第三方服务(如AntiCaptch…

Java项目-基于springboot框架的智慧外贸系统项目实战(附源码+文档)

作者:计算机学长阿伟 开发技术:SpringBoot、SSM、Vue、MySQL、ElementUI等,“文末源码”。 开发运行环境 开发语言:Java数据库:MySQL技术:SpringBoot、Vue、Mybaits Plus、ELementUI工具:IDEA/…

SSL VPN调试思路及配置指南

一、概述 本指南旨在详细阐述外部人员通过SSL VPN访问内部资源的调试过程与配置步骤。SSL VPN被单臂部署在核心交换机上,并通过外网防火墙将SSL VPN的443端口映射至外部网络,以实现安全的远程访问。 二、配置步骤 系统管理 网络设置: 配置接…

Kafka、Kafka Streams、Drools、Redis 和分布式数据库的风控系统程序

由于实时风控系统难度较大,集成框架设计各个单位均有特点,快速建立一个通用性较强,学习、实施和使用成本较低的框架尤其重要。 提供一个简化的 Java 程序示例,演示如何将 Kafka 消息中间件、Kafka Streams 计算引擎、Drools 规则…

小新学习K8s第一天之K8s基础概念

目录 一、Kubernetes(K8s)概述 1.1、什么是K8s 1.2、K8s的作用 1.3、K8s的功能 二、K8s的特性 2.1、弹性伸缩 2.2、自我修复 2.3、服务发现和负载均衡 2.4、自动发布(默认滚动发布模式)和回滚 2.5、集中化配置管理和密钥…

高效改进!防止DataX从HDFS导入关系型数据库丢数据

高效改进!防止DataX从HDFS导入关系型数据库丢数据 针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题,优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片,确保了每个分片数据都得到全面读取和传输&…