flink同步mysql数据表到pg库

在这里插入图片描述

1.关闭防火墙和selinux

systemctl stop firewalld
systemctl disable firewalld
systemctl status firewalld
vi /etc/selinux/config
修改为disabled

在这里插入图片描述

2.安装java8

yum list java-1.8* yum install java-1.8.0-openjdk* -yjava -version

在这里插入图片描述

3.下载和部署postgresql

下载地址:http://www.postgresql.org/ftp/source/

我安装的最新版本,可以看需求安装pg库版本
在这里插入图片描述

上传到服务器

tar -zxvf postgresql-16.2.tar.gz
./configure

如果出现以下报错解决方法:

yum -y install gcc  

在这里插入图片描述

如果出现以下错误解决方法:

yum -y install libicu libicu-devel libunwind readline-devel zlib-devel 

在这里插入图片描述

make
make install
useradd postgres
groupadd postgres
useradd -g postgres postgres
id postgrescd /usr/local/pgsql/
mkdir data
chown postgres:postgres datacd /home/postgres/
ll -al

在这里插入图片描述

vi .bash_profile
添加
export PGHOME=/usr/local/pgsql/
export PGDATA=/usr/local/pgsql/data
PATH=$PATH:$HOME/bin:$PGHOME/bin

在这里插入图片描述

source .bash_profile
su postgres
initdb

在这里插入图片描述

cd /usr/local/pgsql/data/
vi postgresql.conf
修改为listen_addresses = '*'

在这里插入图片描述

启动pg库
pg_ctl -D /usr/local/pgsql/data -l logfile start
service postgresql start

在这里插入图片描述

su – postgres
psql

在这里插入图片描述

4.下载和部署mysql

yum -y install wget
wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz

在这里插入图片描述

mv mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz /usr/local/
cd /usr/local/
tar xvJf mysql-8.0.28-linux-glibc2.12-x86_64.tar.xzmv mysql-8.0.28-linux-glibc2.12-x86_64 mysql-8.0
mkdir data
groupadd mysql
chown -R mysql.mysql /usr/local/mysql-8.0
cd mysql-8.0/bin/
mkdir data
./mysqld --user=mysql --basedir=/usr/local/mysql-8.0 --datadir=/usr/local/mysql-8.0/data/ --initialize

在这里插入图片描述

圈起来的部分为后面数据库登陆的初始密码

vi /etc/my.cnf
basedir=/usr/local/mysql-8.0/
datadir=/usr/local/mysql-8.0/data/
socket=/tmp/mysql.sock
character-set-server=UTF8MB4

在这里插入图片描述

cp -a .././support-files/mysql.server /etc/init.d/mysql
chmod +x /etc/init.d/mysql
chkconfig --add mysql
service mysql status
service mysql start
ln -s /usr/local/mysql-8.0/bin/mysql /usr/bin

在这里插入图片描述

登陆

mysql -u root -p

输入向前初始化给的初始密码
在这里插入图片描述

修改mysql密码
在这里插入图片描述

登陆

在这里插入图片描述

5.下载部署flink

flink下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz

在这里插入图片描述

下载flink sql所需驱动

1.	flink-connector-jdbc-3.1.1-1.17.jar 下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/  
2.	flink-sql-connector-mysql-cdc-3.0.1.jar 下载地址:https://github.com/ververica/flink-cdc-connectors/releases 
3.	flink-sql-connector-postgres-cdc-3.0.1.jar 下载地址:https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/3.0.1
4.	mysql-connector-java-8.0.28.jar 下载地址:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/    

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

上传flink压缩包并解压

tar -zxvf flink-1.18.1-bin-scala_2.12.tgz

进入flink的lib目录上传四个依赖

cd flink-1.18.1/lib/

需要修改一下flink配置文件
不然会出现以下情况
在这里插入图片描述

vi flink-1.18.1/conf/flink-conf.yaml

原本是localhost修改为ip
在这里插入图片描述

./flink-1.18.1/bin/start-cluster.sh

访问 192.168.207.193:8081 (默认是8081端口 可在配置文件里修改)
在这里插入图片描述

6.实时同步mysql数据到postgresql

数据库先创建一个库,在库里创建表再添加数据

create database ljq;use ljqCREATE TABLE players (
player_id INT NOT NULL AUTO_INCREMENT,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);;insert into players (player_id,team_id,player_name,height) values 
(1001,1001,'韦德','1.93'),
(1002,1002,'雷吉','1.91'),
(1003,1003,'安德烈','2.11'),
(1004,1004,'索恩','2.16'),
(1005,1005,'兰斯顿','1.88'),
(1006,1006,'格伦','1.98'),
(1007,1007,'伊斯梅尔','1.83'),
(1008,1008,'扎扎','2.11'),
(1009,1009,'乔恩','2.08');select * from players;

在这里插入图片描述
在这里插入图片描述

在pg库创建一个库,在库里创建一个表不插入数据

create database ljq;
\c ljqCREATE TABLE players3 (
player_id INT NOT NULL,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);

在这里插入图片描述

启动flink-sql

./flink-1.18.1/bin/sql-client.sh embedded

根据需要同步的数据创建源表

CREATE TABLE nbaplayers (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.207.193',
'port' = '3306',
'username' = 'root',
'password' = 'linux123',
'database-name' = 'ljq',
'table-name' = 'players',
'server-time-zone' = 'Asia/Shanghai'
);

在这里插入图片描述

查看表时出现报错! 在这里插入图片描述

解决方法:修改为允许所有ip访问

use mysql;
select user,host from user;
update user set host = '%' where user = 'root';
flush privileges;
select user,host from user;

在这里插入图片描述
在这里插入图片描述

重新查看表

select * from nbaplayers;·

在这里插入图片描述

创建结果表

CREATE TABLE nba (
player_id INT,
team_id INT,
player_name VARCHAR,
height NUMERIC(3,2),
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.207.193:5432/ljq??currentSchema=public&reWriteBatchedInserts=true',
'username' = 'postgres',
'password' = 'linux123',
'table-name' = 'players3'
);

执行从源表插入结果表操作,生成同步作业

INSERT INTO nba
SELECT
player_id,
team_id,
player_name,
height
FROM nbaplayers;

Web端查看
在这里插入图片描述

查看是否同步数据到pg库的players3表
在这里插入图片描述

测试一下增删改是否同步

1.增

mysql执行

insert into players (player_id,team_id,player_name,height) values (10001,10002,'韦德2','1.95');

在这里插入图片描述

pg库查看
在这里插入图片描述

2.删
mysql执行

delete from players where player_id = 10001;

在这里插入图片描述

pg库查看
在这里插入图片描述

3.改
mysql执行

update players set player_name='高大的姚明' where player_id=1001;

在这里插入图片描述

pg库查看
在这里插入图片描述

注:字符串小数点精确问题

在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886145.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HBase理论_HBase架构组件介绍

近来有些空闲时间,正好最近也在开发HBase相关内容,借此整理一下学习和对HBase组件的架构的记录和个人感受,付出了老夫不少心血啊,主要介绍的就是HBase的架构设计以及我的拓展内容。内容如有不当或有其他理解 matirx70163.com HB…

第九部分 :1.STM32之通信接口《精讲》(USART,I2C,SPI,CAN,USB)

本芯片使用的是STM32F103C8T6型号 STM32F103C8T6是STM32F1系列中的一种较常用的低成本ARM Cortex-M3内核MCU,具有丰富的通信接口,包括USART、SPI、I2C等。下面是该芯片上通信接口的管脚分布、每个接口的工作模式、常用应用场景和注意事项。 1. USART (通…

ODOO学习笔记(8):模块化架构的优势

灵活性与可定制性 业务流程适配:企业的业务流程往往因行业、规模和管理方式等因素而各不相同。Odoo的模块化架构允许企业根据自身的具体业务流程,选择和组合不同的模块。例如,一家制造企业可以启用采购、库存、生产和销售模块,并通…

MATLAB课程:AI工具辅助编程——MATLAB+LLMs

给出一些可能有用的方法辅助大家写代码。 方法一:MATLAB软件LLM (不太懂配置的同学们为了省事可以主要用这个方法) 方法一特别针对本门MATLAB教学课程,给出一种辅助ai工具的操作指南。MATLAB中可以安装MatGPT插件,该插件通过调用ChatGPT的API…

C++二叉平衡搜索树:AVL树的插入、删除与平衡

目录 引言 AVL树的概念 AVL树节点的定义 AVL树的插入 AVL树的基本结构 AVL树的插入 第一步:按搜索树的规则进行插入 第二步:更新平衡因子 1、父节点的平衡因子为 parent->bf 0 2、更新完 parent 的 bf,如果 parent->bf 1…

机器学习(1)

一、机器学习 机器学习(Machine Learning, ML)是人工智能(Artificial Intelligence, AI)的一个分支,它致力于开发能够从数据中学习并改进性能的算法和模型。机器学习的核心思想是通过数据和经验自动优化算法&#xff…

【Kafka】集成案例:与Spark大数据组件的协同应用

🐇明明跟你说过:个人主页 🏅个人专栏:《大数据前沿:技术与应用并进》🏅 🔖行路有良友,便是天堂🔖 目录 一、引言 1、什么是kafka 2、Kafka 的主要特性 3、Kafka 的…

【卡尔曼滤波】递归算法Recursive的应用 C语言、Python实现(Kalman Filter)

【卡尔曼滤波】递归算法Recursive的应用 C语言、Python实现(Kalman Filter) 更新以gitee为准: gitee地址 文章目录 递归算法算术平均的递归算法例子卡尔曼滤波递归Python实现C语言实现与普通卡尔曼滤波的比较附录:压缩字符串、大…

python+pptx:(二)添加图片、表格、形状、模版渲染

目录 图片 表格 合并单元格 填充色、边距 写入数据 形状 模版渲染 上一篇:pythonpptx:(一)占位符、文本框、段落操作_python输出ppt母版占位符标号-CSDN博客 from pptx import Presentation from pptx.util import Cm, In…

【Windows】CMD命令学习——系统命令

CMD(命令提示符)是Windows操作系统中的一个命令行解释器,允许用户通过输入命令来执行各种系统操作。 系统命令 systeminfo - 显示计算机的详细配置信息。 tasklist - 显示当前正在运行的进程列表。 taskkill - 终止正在运行的进程。例如&am…

Java的栈与队列以及代码实现

Java栈和队列 栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组实现)用队列实现栈用栈来实现队列总结 栈的概念(Stack) 栈是常见的线性数据结构&…

Node.js is Web Scale

点击“打开/下载题目”进去看看情况: 为了方便查看翻译成中文简体来看: emmm,看不懂什么意思,查看源代码,js表示是一段JavaScript代码,丢给AI分析一下: // server.js const express require(&…

缓冲区溢出,数据被踩的案例学习

继续在ubuntu上学习GDB,今天要学习的是缓冲区溢出。 程序的地址: GitHub - gedulab/gebypass: bypass password by heap buffer overflow 编译的方法: gcc -g -O2 -o gebypass gebypass.c 照例设置一下科学shangwang代理: e…

数字人直播骗局大曝光!真假源码厂商搭部署的源码有何差异?

随着数字人直播技术的不断发展成熟,它所蕴含着的市场前景和收益潜力开始逐渐显化,使得有意向入局的人数持续增多的同时,也让不少骗子看到了可乘之机,从而炮制出了一个又一个的数字人直播骗局。 其中,最为经典的便是dai…

#渗透测试#SRC漏洞挖掘#云技术基础03之容器相关

目录 一、Podman相关 (一)Podman简介 (二)Pod相关操作 二、容器相关 (一)容器概念 (二)容器的历史发展 (三)Capabilities相关 三、Kubernetes&#x…

前端搭建低代码平台,微前端如何选型?

目录 背景 一、微前端是什么? 二、三大特性 三、现有微前端解决方案 1、iframe 2、Web Components 3、ESM 4、EMP 5、Fronts 6、无界(文档) 7、qiankun 四、我们选择的方案 引入qiankun并使用(src外层作为主应用) 主应…

Windows VSCode .NET CORE WebAPI Debug配置

1.安装C#插件 全名C# for Visual Studio Code,选择微软的 2. 安装C# Dev Kit插件 全名C# Dev Kit for Visual Studio Code,同样是选择微软的 3.安装Debugger for Unity 4.配置launch.json 文件 {"version": "0.2.0","config…

Linux——GPIO输入输出裸机实验

学习了正点原子Linux环境下的GPIO的输入输出的裸机实验学习,现在进行一下小结: 启动文件start.S的编写 .global _start .global _bss_start _bss_start:.word __bss_start.global _bss_end _bss_end:.word __bss_end_start:/*设置处理器进入SVC模式*/m…

Cyberchef配合Wireshark提取并解析TCP/FTP流量数据包中的文件

前一篇文章中讲述了如何使用cyberchef提取HTTP/TLS数据包中的文件,详见《Cyberchef配合Wireshark提取并解析HTTP/TLS流量数据包中的文件》,链接这里,本文讲述下如何使用cyberchef提取FTP/TCP数据包中的文件。 FTP 是最为常见的文件传输协议,和HTTP协议不同的是FTP协议传输…

51c大模型~合集42

我自己的原文哦~ https://blog.51cto.com/whaosoft/11859244 #猎户座 「草莓」即将上线,OpenAI新旗舰大模型曝光,代号「猎户座」 ChatGPT 要进化了? 本月初,OpenAI 创始人、CEO 山姆・奥特曼突然在 X 上发了一张照片&#xff0…