flink同步mysql数据表到pg库

在这里插入图片描述

1.关闭防火墙和selinux

systemctl stop firewalld
systemctl disable firewalld
systemctl status firewalld
vi /etc/selinux/config
修改为disabled

在这里插入图片描述

2.安装java8

yum list java-1.8* yum install java-1.8.0-openjdk* -yjava -version

在这里插入图片描述

3.下载和部署postgresql

下载地址:http://www.postgresql.org/ftp/source/

我安装的最新版本,可以看需求安装pg库版本
在这里插入图片描述

上传到服务器

tar -zxvf postgresql-16.2.tar.gz
./configure

如果出现以下报错解决方法:

yum -y install gcc  

在这里插入图片描述

如果出现以下错误解决方法:

yum -y install libicu libicu-devel libunwind readline-devel zlib-devel 

在这里插入图片描述

make
make install
useradd postgres
groupadd postgres
useradd -g postgres postgres
id postgrescd /usr/local/pgsql/
mkdir data
chown postgres:postgres datacd /home/postgres/
ll -al

在这里插入图片描述

vi .bash_profile
添加
export PGHOME=/usr/local/pgsql/
export PGDATA=/usr/local/pgsql/data
PATH=$PATH:$HOME/bin:$PGHOME/bin

在这里插入图片描述

source .bash_profile
su postgres
initdb

在这里插入图片描述

cd /usr/local/pgsql/data/
vi postgresql.conf
修改为listen_addresses = '*'

在这里插入图片描述

启动pg库
pg_ctl -D /usr/local/pgsql/data -l logfile start
service postgresql start

在这里插入图片描述

su – postgres
psql

在这里插入图片描述

4.下载和部署mysql

yum -y install wget
wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz

在这里插入图片描述

mv mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz /usr/local/
cd /usr/local/
tar xvJf mysql-8.0.28-linux-glibc2.12-x86_64.tar.xzmv mysql-8.0.28-linux-glibc2.12-x86_64 mysql-8.0
mkdir data
groupadd mysql
chown -R mysql.mysql /usr/local/mysql-8.0
cd mysql-8.0/bin/
mkdir data
./mysqld --user=mysql --basedir=/usr/local/mysql-8.0 --datadir=/usr/local/mysql-8.0/data/ --initialize

在这里插入图片描述

圈起来的部分为后面数据库登陆的初始密码

vi /etc/my.cnf
basedir=/usr/local/mysql-8.0/
datadir=/usr/local/mysql-8.0/data/
socket=/tmp/mysql.sock
character-set-server=UTF8MB4

在这里插入图片描述

cp -a .././support-files/mysql.server /etc/init.d/mysql
chmod +x /etc/init.d/mysql
chkconfig --add mysql
service mysql status
service mysql start
ln -s /usr/local/mysql-8.0/bin/mysql /usr/bin

在这里插入图片描述

登陆

mysql -u root -p

输入向前初始化给的初始密码
在这里插入图片描述

修改mysql密码
在这里插入图片描述

登陆

在这里插入图片描述

5.下载部署flink

flink下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz

在这里插入图片描述

下载flink sql所需驱动

1.	flink-connector-jdbc-3.1.1-1.17.jar 下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/  
2.	flink-sql-connector-mysql-cdc-3.0.1.jar 下载地址:https://github.com/ververica/flink-cdc-connectors/releases 
3.	flink-sql-connector-postgres-cdc-3.0.1.jar 下载地址:https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/3.0.1
4.	mysql-connector-java-8.0.28.jar 下载地址:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/    

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

上传flink压缩包并解压

tar -zxvf flink-1.18.1-bin-scala_2.12.tgz

进入flink的lib目录上传四个依赖

cd flink-1.18.1/lib/

需要修改一下flink配置文件
不然会出现以下情况
在这里插入图片描述

vi flink-1.18.1/conf/flink-conf.yaml

原本是localhost修改为ip
在这里插入图片描述

./flink-1.18.1/bin/start-cluster.sh

访问 192.168.207.193:8081 (默认是8081端口 可在配置文件里修改)
在这里插入图片描述

6.实时同步mysql数据到postgresql

数据库先创建一个库,在库里创建表再添加数据

create database ljq;use ljqCREATE TABLE players (
player_id INT NOT NULL AUTO_INCREMENT,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);;insert into players (player_id,team_id,player_name,height) values 
(1001,1001,'韦德','1.93'),
(1002,1002,'雷吉','1.91'),
(1003,1003,'安德烈','2.11'),
(1004,1004,'索恩','2.16'),
(1005,1005,'兰斯顿','1.88'),
(1006,1006,'格伦','1.98'),
(1007,1007,'伊斯梅尔','1.83'),
(1008,1008,'扎扎','2.11'),
(1009,1009,'乔恩','2.08');select * from players;

在这里插入图片描述
在这里插入图片描述

在pg库创建一个库,在库里创建一个表不插入数据

create database ljq;
\c ljqCREATE TABLE players3 (
player_id INT NOT NULL,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);

在这里插入图片描述

启动flink-sql

./flink-1.18.1/bin/sql-client.sh embedded

根据需要同步的数据创建源表

CREATE TABLE nbaplayers (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.207.193',
'port' = '3306',
'username' = 'root',
'password' = 'linux123',
'database-name' = 'ljq',
'table-name' = 'players',
'server-time-zone' = 'Asia/Shanghai'
);

在这里插入图片描述

查看表时出现报错! 在这里插入图片描述

解决方法:修改为允许所有ip访问

use mysql;
select user,host from user;
update user set host = '%' where user = 'root';
flush privileges;
select user,host from user;

在这里插入图片描述
在这里插入图片描述

重新查看表

select * from nbaplayers;·

在这里插入图片描述

创建结果表

CREATE TABLE nba (
player_id INT,
team_id INT,
player_name VARCHAR,
height NUMERIC(3,2),
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.207.193:5432/ljq??currentSchema=public&reWriteBatchedInserts=true',
'username' = 'postgres',
'password' = 'linux123',
'table-name' = 'players3'
);

执行从源表插入结果表操作,生成同步作业

INSERT INTO nba
SELECT
player_id,
team_id,
player_name,
height
FROM nbaplayers;

Web端查看
在这里插入图片描述

查看是否同步数据到pg库的players3表
在这里插入图片描述

测试一下增删改是否同步

1.增

mysql执行

insert into players (player_id,team_id,player_name,height) values (10001,10002,'韦德2','1.95');

在这里插入图片描述

pg库查看
在这里插入图片描述

2.删
mysql执行

delete from players where player_id = 10001;

在这里插入图片描述

pg库查看
在这里插入图片描述

3.改
mysql执行

update players set player_name='高大的姚明' where player_id=1001;

在这里插入图片描述

pg库查看
在这里插入图片描述

注:字符串小数点精确问题

在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886145.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL Server 的结构,现在看也不算差

(1)单机管理架构:由同一台计算机包办数据库系统的所有工作,即数据库服务器端和客户端都在同一台计算机上。 (2)主从式管理架构:在一台主机上安装数据库服务器,另外一些计算机作为客…

HBase理论_HBase架构组件介绍

近来有些空闲时间,正好最近也在开发HBase相关内容,借此整理一下学习和对HBase组件的架构的记录和个人感受,付出了老夫不少心血啊,主要介绍的就是HBase的架构设计以及我的拓展内容。内容如有不当或有其他理解 matirx70163.com HB…

第九部分 :1.STM32之通信接口《精讲》(USART,I2C,SPI,CAN,USB)

本芯片使用的是STM32F103C8T6型号 STM32F103C8T6是STM32F1系列中的一种较常用的低成本ARM Cortex-M3内核MCU,具有丰富的通信接口,包括USART、SPI、I2C等。下面是该芯片上通信接口的管脚分布、每个接口的工作模式、常用应用场景和注意事项。 1. USART (通…

ODOO学习笔记(8):模块化架构的优势

灵活性与可定制性 业务流程适配:企业的业务流程往往因行业、规模和管理方式等因素而各不相同。Odoo的模块化架构允许企业根据自身的具体业务流程,选择和组合不同的模块。例如,一家制造企业可以启用采购、库存、生产和销售模块,并通…

搜维尔科技:Manus VR数据手套集成,遥操作五指灵巧手解决方案

Manus VR数据手套集成,遥操作五指灵巧手解决方案 搜维尔科技:Manus VR数据手套集成,遥操作五指灵巧手解决方案

MATLAB课程:AI工具辅助编程——MATLAB+LLMs

给出一些可能有用的方法辅助大家写代码。 方法一:MATLAB软件LLM (不太懂配置的同学们为了省事可以主要用这个方法) 方法一特别针对本门MATLAB教学课程,给出一种辅助ai工具的操作指南。MATLAB中可以安装MatGPT插件,该插件通过调用ChatGPT的API…

C++二叉平衡搜索树:AVL树的插入、删除与平衡

目录 引言 AVL树的概念 AVL树节点的定义 AVL树的插入 AVL树的基本结构 AVL树的插入 第一步:按搜索树的规则进行插入 第二步:更新平衡因子 1、父节点的平衡因子为 parent->bf 0 2、更新完 parent 的 bf,如果 parent->bf 1…

机器学习(1)

一、机器学习 机器学习(Machine Learning, ML)是人工智能(Artificial Intelligence, AI)的一个分支,它致力于开发能够从数据中学习并改进性能的算法和模型。机器学习的核心思想是通过数据和经验自动优化算法&#xff…

【vue3中el-table表格高度自适应】

分享一种开发中遇到的比较方便的表格高度自适应方案 template代码 <div class"page"><div class"table_wrap"><el-table></el-table></div> </div>css代码 将el-table设置为绝对定位&#xff0c;相对于父元素定位且…

【Kafka】集成案例:与Spark大数据组件的协同应用

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《大数据前沿&#xff1a;技术与应用并进》&#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、引言 1、什么是kafka 2、Kafka 的主要特性 3、Kafka 的…

【卡尔曼滤波】递归算法Recursive的应用 C语言、Python实现(Kalman Filter)

【卡尔曼滤波】递归算法Recursive的应用 C语言、Python实现&#xff08;Kalman Filter&#xff09; 更新以gitee为准&#xff1a; gitee地址 文章目录 递归算法算术平均的递归算法例子卡尔曼滤波递归Python实现C语言实现与普通卡尔曼滤波的比较附录&#xff1a;压缩字符串、大…

C#界面设计

C#界面设计通常指的是使用C#编程语言及其相关的图形用户界面&#xff08;GUI&#xff09;框架&#xff08;如Windows Forms、WPF&#xff08;Windows Presentation Foundation&#xff09;或Uno Platform等&#xff09;来创建应用程序的用户界面。以下是一些关于C#界面设计的基…

python+pptx:(二)添加图片、表格、形状、模版渲染

目录 图片 表格 合并单元格 填充色、边距 写入数据 形状 模版渲染 上一篇&#xff1a;pythonpptx&#xff1a;&#xff08;一&#xff09;占位符、文本框、段落操作_python输出ppt母版占位符标号-CSDN博客 from pptx import Presentation from pptx.util import Cm, In…

【Windows】CMD命令学习——系统命令

CMD&#xff08;命令提示符&#xff09;是Windows操作系统中的一个命令行解释器&#xff0c;允许用户通过输入命令来执行各种系统操作。 系统命令 systeminfo - 显示计算机的详细配置信息。 tasklist - 显示当前正在运行的进程列表。 taskkill - 终止正在运行的进程。例如&am…

Java的栈与队列以及代码实现

Java栈和队列 栈的概念&#xff08;Stack&#xff09;栈的实现代码队列&#xff08;Queue&#xff09;模拟实现队列(双链表实现)循环队列&#xff08;循环数组实现&#xff09;用队列实现栈用栈来实现队列总结 栈的概念&#xff08;Stack&#xff09; 栈是常见的线性数据结构&…

Node.js is Web Scale

点击“打开/下载题目”进去看看情况&#xff1a; 为了方便查看翻译成中文简体来看&#xff1a; emmm&#xff0c;看不懂什么意思&#xff0c;查看源代码&#xff0c;js表示是一段JavaScript代码&#xff0c;丢给AI分析一下&#xff1a; // server.js const express require(&…

Unity编辑器的高级扩展技术

一、Unity编辑器扩展的基础知识 Unity编辑器提供了丰富的API&#xff0c;允许开发者创建自定义的编辑器窗口、属性和工具。这些API主要分布在UnityEditor命名空间中&#xff0c;主要包括以下几个类别&#xff1a; Editor Windows&#xff1a;自定义窗口&#xff0c;用于提供独…

缓冲区溢出,数据被踩的案例学习

继续在ubuntu上学习GDB&#xff0c;今天要学习的是缓冲区溢出。 程序的地址&#xff1a; GitHub - gedulab/gebypass: bypass password by heap buffer overflow 编译的方法&#xff1a; gcc -g -O2 -o gebypass gebypass.c 照例设置一下科学shangwang代理&#xff1a; e…

数字人直播骗局大曝光!真假源码厂商搭部署的源码有何差异?

随着数字人直播技术的不断发展成熟&#xff0c;它所蕴含着的市场前景和收益潜力开始逐渐显化&#xff0c;使得有意向入局的人数持续增多的同时&#xff0c;也让不少骗子看到了可乘之机&#xff0c;从而炮制出了一个又一个的数字人直播骗局。 其中&#xff0c;最为经典的便是dai…

【AI日记】24.11.14 复习和准备 RAG 项目 | JavaScript RAG Web Apps with LlamaIndex

【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】 今天的所有工作都是为了明天要开始的个人项目做准备 核心工作 1 内容&#xff1a;JavaScript RAG Web Apps with LlamaIndex时间&#xff1a;2 小时评估&#xff1a;不错&#xff0c;完成收获&#xff1a;学习…