Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink

目录

1. 准备工作

生成数据

创建数据表

2. 创建数据表

创建数据源表

创建数据目标表

3. 计算

WITH子句


1. 准备工作

生成数据

source kafka json 数据格式 :

topic  case_kafka_mysql:

{"ts": "20201011","id": 8,"price_amt":211}

topic  flink_test_2:

{"id": 8,"coupon_price_amt":100}

注意:针对双流中的每条记录都发触发

topic: case_kafka_mysql

docker exec -it 192d1369463a bashbash-5.1# cd /opt/kafka_2.12-2.5.0/binbash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic case_kafka_mysql>{"ts": "20201011","id": 8,"price_amt":211}

topic: flink_test_2

docker exec -it 192d1369463a bashbash-5.1# cd /opt/kafka_2.12-2.5.0/binbash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test_2>{"id": 8,"coupon_price_amt":100}

创建数据表

mysql 建表语句

CREATE TABLE `sync_test_2` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;CREATE TABLE `sync_test_22` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`coupon_ratio` double DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

2. 创建数据表

创建数据源表
create table flink_test_2_1 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test2-1','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');create table flink_test_2_2 (id BIGINT,coupon_price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'flink_test_2','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test2-2','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');

关键配置的说明:

json.fail-on-missing-field:在json缺失字段时是否报错

json.ignore-parse-errors:在解析json失败时是否报错

一般无法保证json格式,所以以上两个配置是比较重要的。

创建数据目标表
CREATE TABLE sync_test_2 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_2','username' = 'root','password' = 'Admin');CREATE TABLE sync_test_22 (ts string,coupon_ration bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_2','username' = 'root','password' = 'Admin');

3. 计算

一个作业中写入一个Sink或多个Sink

说明 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO sync_test_2
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id)
GROUP BY ts;INSERT INTO sync_test_22
SELECTts,sum(coupon_price_amt)/sum(amount) AS coupon_ration
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id)
GROUP BY ts;;
END;      --写入多个Sink时,必填。

WITH子句

WITH提供了一种编写辅助语句以用于更大的查询的方法。这些语句通常被称为公共表表达式(CTE),可以被视为定义仅针对一个查询存在的临时视图。

改写上述查询:

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
with orders_with_coupon AS (SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id
)INSERT INTO sync_test_2
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM orders_with_coupon
GROUP BY ts;INSERT INTO sync_test_22
SELECTts,coupon_price_amt/price_amt AS coupon_ration
FROM orders_with_coupon
GROUP BY ts;;
END;      --写入多个Sink时,必填。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/200580.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JSON 语法详解:轻松掌握数据结构(上)

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…

postgresql-effective_cache_size参数详解

在 PostgreSQL 中,effective_cache_size 是一个配置参数,用于告诉查询规划器关于系统中可用缓存的估计信息。这个参数并不表示实际的内存量,而是用于告诉 PostgreSQL 查询规划器系统中可用的磁盘缓存和操作系统级别的文件系统缓存的大小。它用…

Lambda表达式用法汇总

Lambda表达式用法汇总 java8 中引入的 Lambda 表达式真的是个好东西,掌握之后,写代码更简洁了,码字效率也提升了不少,这里咱 们一起来看看 Lambada 表达式常见的写法,加深理解。 1、有参无返回值函数式接口 8 种写法…

【代码随想录】算法训练计划39

dp 1、62. 不同路径 题目: 求路径方案多少个 思路: 这道题就有点dp了哈 func uniquePaths(m int, n int) int {//dp,写过,代表的是多少种// 初始化dp : make([][]int, m)for i : range dp {dp[i] make([]int, n)dp[i][0] 1 // 代表到…

用友NC Cloud FileParserServlet反序列化RCE漏洞复现

0x01 产品简介 用友 NC Cloud 是一种商业级的企业资源规划云平台,为企业提供全面的管理解决方案,包括财务管理、采购管理、销售管理、人力资源管理等功能,实现企业的数字化转型和业务流程优化。 0x02 漏洞概述 用友 NC Cloud FileParserServlet接口存在反序列化代码执行漏…

response应用

文章目录 [TOC](文章目录) response说明一、response文件下载二、待补充。。。 response说明 response是指HttpServletResponse,该响应有很多的应用,比如像浏览器输出消息,下载文件,实现验证码等。 一、response文件下载 1.创建一个javaw…

springboot整合swagger

1)简介: 作为后端开放人员,最烦的事就是自己写接口文档和别人没有写接口文档,不管是前端还是后端开发,多多少少都会被接口文档所折磨,前端会抱怨后端没有及时更新接口文档,而后端又会觉得编写接…

Spark-03: Spark SQL 基础编程

目录 1.Spark SQL 简介 2.SparkSession 3.Spark SQL 数据的读写 3.1 读写 TXT 文件 3.2 读写 CSV 文件 3.3 读写 JSON 文件 3.4 读写 Parquet 文件 3.5 读写 ORC 文件 3.6 读写MySQL数据库 4.Spark SQL 语法 4.1 SQL 语法 4.2 DSL 语法 1.Spark SQL 简介 Spark SQL…

备份和恢复Linux服务器上的HTTP配置

备份和恢复Linux服务器上的HTTP配置是一项重要的任务,它可以确保您的服务器在出现故障或配置错误时能够迅速恢复正常运行。下面我们将介绍如何备份和恢复Linux服务器上的HTTP配置。 备份HTTP配置 登录到Linux服务器上,并使用root权限。 备份HTTP配置文…

分部积分法

1.形式:u对v求积分uv-v对u求积分,一前一后,一般把三角函数,反三角函数,In,e的x次方提到d里面 2. 3. 4. 5. 6. 7. 当结果中出现要求的不要慌,不是1直接求,是1重新计算

一体化污水处理设备材质怎么选

在环保意识日益增强的今天,污水处理设备成为城市建设过程中的重要环节。而选择合适的一体化污水处理设备材质,则成为了一项重要的决策。本文将从专业的角度出发,为您解析一体化污水处理设备材质的选取。 首先,一体化污水处理设备材…

postman常用脚本

在参数中动态添加开始时间和结束时间的时间戳 1.先在collection中添加参数,这里的作用域是collection,也可以是其他的任何scope 2.在Pre-request Script 中设定开始时间和结束时间参数,比如昨天和今天的时间戳,下面是js代码 con…

Android Studio Hedgehog | 2023.1.1(刺猬)

Android Gradle 插件和 Android Studio 兼容性 Android Studio 构建系统基于 Gradle,并且 Android Gradle 插件 (AGP) 添加了一些特定于构建 Android 应用程序的功能。下表列出了每个版本的 Android Studio 所需的 AGP 版本。 Android Studio versionRequired AG…

Kubernetes 常用命令

集群信息&#xff1a; 1. 显示 Kubernetes 版本&#xff1a;kubectl version2. 显示集群信息&#xff1a;kubectl cluster-info3. 列出集群中的所有节点&#xff1a;kubectl get nodes4. 查看一个具体的节点详情&#xff1a;kubectl describe node <node-name>5. 列出所…

python学习:opencv+用鼠标画矩形和圆形

目录 步骤 定义数据 新建一个窗口黑色画布 显示黑色画布 添加鼠标回调函数 循环 一直显示图片 一直判断有没有按下字母 m 关闭所有窗口 鼠标回调函数 步骤 当鼠标按下记录坐标并记录鼠标标记位为true&#xff0c;移动的时候就会不断的画矩形或者圆&#xff0c;松下的时候就再…

vue项目通过宝塔部署之后,页面刷新后浏览器404页面

转载&#xff1a;vue项目通过宝塔部署之后&#xff0c;页面刷新后浏览器404页面。

ioc循环依赖怎么解决

在使用IoC&#xff08;Inversion of Control&#xff09;容器时&#xff0c;循环依赖是一个常见的问题。不同的IoC容器提供了不同的解决方案。在Spring框架中&#xff0c;常用的解决循环依赖的注解是 Lazy 和 Autowired。 1.Lazy 注解&#xff1a; 在Spring中&#xff0c;Lazy…

STM32F1中断NVIC

目录 1. 中断系统 2. 中断向量表 3. NVIC基本结构 4. NVIC优先级分组 5. NVIC程序编写 5.1 中断分组 5.2 中断结构体变量 5.3 中断通道选择 5.4 抢占优先级和响应优先级配置 6. 中断程序执行 1. 中断系统 中断&#xff1a;在主程序运行过程中&#xff0…

如何设计自动化测试脚本

企业中如何设计自动化测试脚本呢&#xff1f;今天我们就来为大家分享一些干货。 一、线性设计 线性脚本设计方式是以脚本的方式体现测试用例&#xff0c;是一种非结构化的编码方式&#xff0c;多数采用录制回放的方式&#xff0c;测试工程师通过录制回访的访问对被测系统进行…

基于JSDoc实现TypeScript类型安全的实践报告

在FEDay 2023中我讲了《从JS到TS无缝迁移的实践报告》【视频在这里在这里】&#xff0c;是将一个传统的JS项目&#xff08;mochajs/mocha&#xff09;迁移到TypeScript环境的全程。其中提到了一件事情&#xff0c;就是“可以通过JSDoc/TSDoc来生成.d.ts”&#xff0c;从而实现T…