Flink构造宽表实时入库案例介绍

1. 安装包准备

Flink 1.15.4 安装包

Flink cdc的mysql连接器

Flink sql的sdb连接器

MySQL驱动

SDB驱动

Flink jdbc的mysql连接器

 

2. 入库流程图

3. Flink安装部署

  1. 上传Flink压缩包到服务器,并解压

tar -zxvf  flink-1.14.5-bin-scala_2.11.tgz  -C /opt/

  1. 复制依赖至Flink中

cp sdb-flink-connector-3.4.8-jar-with-dependencies.jar /opt/flink-1.14.5/lib
cp sequoiadb-driver-3.4.8.jre8.jar /opt/flink-1.14.5/lib
cp flink-sql-connector-mysql-cdc-2.2.1.jar /opt/flink-1.14.5/lib
cp flink-connector-jdbc_2.11-1.14.6.jar /opt/flink-1.14.5/lib

  1. 修改flink-conf.yaml文件

vi conf/flink-conf.yaml

### 配置Master的机器名(IP地址)
jobmanager.rpc.address: sdb1
### 配置每个taskmanager 生成的临时文件夹
io.tmp.dirs: /opt/flink-1.14.5/tmp

  1. 修改master文件

vi conf/masters

#作为master的ip和端口号
upgrade1:8081

  1. 修改worker文件

vi conf/workers

#集群主机名
upgrade1
upgrade2
upgrade3

  1. 拷贝到集群其他机器

scp -r /opt/flink-1.14.5 sdbadmin@upgrade2:/opt/
scp -r /opt/flink-1.14.5 sdbadmin@upgrade3:/opt/

  1. 启动flink集群

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/start-cluster.sh

  1. 启动flink-SQL

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/sql-client.sh

4. 实时入库

编写造数程序进行造数

4.1 环境准备

4.1.1 开启mysql的binlog

  1. 创建binlog文件夹

[sdbadmin@upgrade1 mysql]$ mkdir /opt/sequoiasql/mysql/database/3306/binlog

  1. 开启binlog

vim /opt/sequoiasql/mysql/database/3306/auto.cnf

>>配置以下内容:
log_bin=/opt/sequoiasql/mysql/database/3306/binlog
binlog_format=ROW
expire_logs_days=1
server_id=1

配置完成之后,重启mysql

[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl stop myinst
[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl start myinst

4.1.2 创建mysql表

创建库

create database sbtest;
use sbtest;

创建表

CREATE TABLE sbtest1 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest2 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name2 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest3 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

创建flink入库表

CREATE TABLE sbtest4 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

4.1.3 创建flink映射表

需要用到flink-sql-connector-mysql-cdc-2.2.1.jar

CREATE TABLE sbtest1_mysql (
    id INT,
    uuid INT,
    name1 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest1'
);

CREATE TABLE sbtest2_mysql (
    id INT,
    uuid INT,
    name2 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest2'
);

CREATE TABLE sbtest3_mysql (
    id INT,
    uuid INT,
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest3'
);

创建flink -->  mysql入库映射表

需要用到flink-connector-jdbc_2.11-1.14.6.jar

CREATE TABLE sbtest4_mysql (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.223.135:3306/sbtest',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'sbtest4'
);

创建flink -->  mysql入库映射表

需要用到sdb-flink-connector-3.4.8-jar-with-dependencies.jar

CREATE TABLE sbtest_sdb (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'sequoiadb',
    'bulksize' = '1',
    'hosts' = '192.168.223.135:11810',
    'collectionspace' = 'sbtest',
    'collection' = 'sbtest4'
);

4.2 MySQL实时入库

4.2.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.2.2 mysql实时入库

insert into sbtest4_mysql select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

查看可以成功入库

4.3 SDB实时入库

4.3.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.3.2 sdb实时入库

insert into sbtest_sdb select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

显示已经成功入库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/613660.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

显示器新赛道Type-C接口

如果把主机比作大脑,那显示器就是眼睛,没有眼睛,大脑再强大也发挥不出效果,所以显示器作为电脑最重要的输出设备,有着举足轻重的地位,可以说在生活中处处都有显示器的影子。其实显示器的历史也是科技发展史…

python第三节:Str字符串类型(3)

str.index(sub[, start[, end]]) 类似于 find(),但在找不到子字符串时会引发 ValueError。 例子: str1 my name is jack!print(str1.index(i))print(str1.index(b)) 结果: Traceback (most recent call last): File "D:/pythonPro…

涛思数据获评北京市“专精特新”中小企业

众所周知,“专精特新”企业是国家引导中小企业增强自主创新能力和核心竞争力,不断提高中小企业发展质量和水平而实施的重大工程,旨在支持企业走专精特新发展之路,更好地促进企业高质量发展,也成为各领域产业链供应的关…

YOLOv8 Ultralytics:使用Ultralytics框架进行定向边界框对象检测

YOLOv8 Ultralytics:使用Ultralytics框架进行定向边界框对象检测 前言相关介绍前提条件实验环境安装环境项目地址LinuxWindows 使用Ultralytics框架进行定向边界框对象检测参考文献 前言 由于本人水平有限,难免出现错漏,敬请批评改正。更多精…

mysql 索引优化查询

MySQL的索引可以提高数据库查询性能。下面是一些常用的MySQL索引优化技巧: 创建合适的索引:根据查询条件选择合适的列作为索引,并确保这些索引在WHERE子句中被使用到。 示例代码:CREATE INDEX idx_name ON table_name (column_nam…

【设计模式】02-SOLID 设计原则

面向对象编程(OOP)是一种广泛应用的编程范式,它鼓励开发者通过对象来模拟现实世界。为了提高面向对象设计(OOD)的质量和可维护性,Robert C. Martin提出了 SOLID 原则,这五个原则构成了编写良好、…

Linux 基于 rsync 实现集群分发脚本 xsync

一、rsync 简介 rsync(remote synchronize)是 Liunx/Unix 下的一个远程数据同步工具。它可以通过 LAN/WAN 快速同步多台主机间的文件和目录,并适当利用 rsync 算法(差分编码)以减少数据的传输。 rsync 算法并不是每一次…

Redis 常见数据结构以及使用场景分析

Java面试题目录 Redis 常见数据类型以及使用场景分析 Redis中有string、list、hash、set、sorted set、bitmap这6种数据类型。 string可以用来做缓存,分布式锁,计数器等。 list可以实现消息队列,分页查询等。 hash适合存储对象结构。 set 可…

QT问题 ui提升部件时No such file or directory

问题: qt使用ui对部件提升在编译时找不到对应的头文件 出错原因: 因为将部件提升为自定义部件后,在编译时会去默认的路径下去找头文件,而自定义的头文件并不在默认路径文件下,而是在当前目录下,所以这个时候需要自己指定出自…

回归预测 | Matlab实现RIME-HKELM霜冰算法优化混合核极限学习机多变量回归预测

回归预测 | Matlab实现RIME-HKELM霜冰算法优化混合核极限学习机多变量回归预测 目录 回归预测 | Matlab实现RIME-HKELM霜冰算法优化混合核极限学习机多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现RIME-HKELM霜冰算法优化混合核极限学习机多变…

Redis群集-主从、哨兵、集群

redis群集有三种模式,分别是主从同步/复制、哨兵模式、Cluster ●主从复制:主从复制是高可用Redis的基础,哨兵和集群都是在主从复制基础上实现高可用的。主从复制主要实现了数据的多机备份,以及对于读操作的负载均衡和简单的故障恢…

priority_queue比较规则

priority_queue比较规则 std::priority_queue实际上就是一个堆&#xff0c;可用于堆排序。 std::priority_queue是C STL中的一个容器适配器&#xff0c;它提供常数时间查找最大元素的功能。默认情况下&#xff0c;它使用元素的<运算符进行排序&#xff08;升序&#xff09…

关于java的面向对象

关于java的面向对象 我们在前面的文章中&#xff0c;学习到了java的基础知识。类&#xff0c;变量&#xff0c;方法&#xff0c;数组等&#xff0c;我们本篇文章来了解一下&#xff0c;java的面向对象&#xff0c;也是java比较核心的存在&#x1f60a; 1、面向过程的思想&…

Redis 发布订阅

目录 1.Redis Unsubscribe 命令 - 指退订给定的频道。简介语法可用版本: > 2.0.0返回值: 这个命令在不同的客户端中有不同的表现。 示例 2.Redis Subscribe 命令 - 订阅给定的一个或多个频道的信息。简介语法可用版本: > 2.0.0返回值: 接收到的信息 示例 3.Redis Pubsub …

【技能---labelme软件的安装及其使用--ubuntu】

文章目录 概要Labelme 是什么&#xff1f;Labelme 能干啥&#xff1f; Ubuntu20.04安装Labelme1.Anaconda的安装2.Labelme的安装3.Labelme的使用 概要 图像检测需要自己的数据集&#xff0c;为此需要对一些数据进行数据标注&#xff0c;这里提供了一种图像的常用标注工具——la…

13. C++ linux命令,查看端口占用,cpu负载,内存占用,如何发送信号给一个进程

linux命令&#xff1a; 查看端口占用&#xff1a;losf -i:端口号 、netstat -tunlp cpu负载: top、uptime 内存占用: free -m、vmstat -s、top 查看磁盘: df、du Linux命令netstat用过吗&#xff1f; Linux如何查看哪些进程占用的内存最多: ps aux tracetoute命令了解吗…

ChatSDK 全双工语音识别库

ChatSDK :是对AIUI的语音SDK封装,套餐费用最低在6万/年iflylib :是对原始msc的语音SDK封装&#xff0c;相对AIUI便宜很多baidulib :是对百度语音SDK封装,百度号称永久免费AIUITools :AIUI网络测试工具-折线图动态测试可持续观测 共同特点&#xff1a;实现了全双工语音识别iat、…

Camunda Spin

Spin 常用于在脚本中解析json或者xml使用&#xff0c;S(variable) 表示构造成Spin对象&#xff0c;通过prop(“属性名”)获取属性值&#xff0c;通过stringValue()、numberValue()、boolValue() 等对类型转换。 repositoryService.createDeployment().name("消息事件流程&…

web第一次作业

题1&#xff1a; <form action"#" method"post"><table><tr><td>用户名&#xff1a;</td><td><input type"text" name"UserName" maxlength"20" size"15"></td>…

NumPy 数据操作实用指南:从基础到高效(上)

简介&#xff1a; 本文介绍了使用 NumPy 进行数据操作的基本步骤&#xff0c;包括导入库、创建数组、基本操作等。通过实例演示了如何利用 NumPy 进行数组的创建、索引、切片、变形、级联和切割等操作&#xff0c;以及如何应用这些功能在图像处理中进行实际应用。 numpy get st…