FlinkSql一个简单的测试程序

FlinkSql一个简单的测试程序

以下是一个简单的 Flink SQL 示例,展示了如何使用 Flink Table API 和 Flink SQL 进行基本的数据流处理。


  1. 定义数据实体 CC :
    - CC 类表示数据流中的元素,包含两个字段: character (字符)和 count (计数)。
    - 提供了无参构造函数和带参构造函数,用于创建 CC 对象。
    // 1. 定义数据实体public static class CC {public String character;public long count;public CC() {}public CC(String character, long count) {this.character = character;this.count = count;}} 

  1. 创建执行环境并模拟数据流:
    - 创建了 Flink 执行环境 StreamExecutionEnvironment 和 StreamTableEnvironment 。
    - 创建了一个包含字符串元素的数据流 inputStream ,其中包括 “hello”, “world” 和 “!!!”。
        // 2. 创建执行环境并模拟数据流StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);DataStream<String> inputStream = env.fromElements("hello","world","!!!").uid("source").name("source");

  1. 对数据流进行 flatMap 操作:
    - 使用 flatMap 对每个输入字符串进行拆分,并将每个字符映射为一个 CC 对象。
        // 3. 对数据流进行flatMap()操作SingleOutputStreamOperator<CC> streamOperator = inputStream.flatMap(new FlatMapFunction<String, CC>() {@Overridepublic void flatMap(String value, Collector<CC> out) throws Exception {for (char c : value.toCharArray()) {out.collect(new CC(c + "",1L));}}});

  1. 将数据流转为 Table :
    - 使用 tableEnv.fromDataStream 将 streamOperator 转换为一个 Table 对象。
        // 4. 将数据流转为TableTable table = tableEnv.fromDataStream(streamOperator);

  1. 使用 Table API 操作数据流:
    - 对 table 进行选择和过滤操作,保留字符不为空的记录。
    - 对过滤后的数据进行分组,并计算每个字符的计数总和,将结果存储在 result 中。
        // 5. 使用tableApi操作数据流,并输出结果Table filter = table.select($("character"), $("count")).filter($("character").isNotEqual(""));Table result = filter.groupBy($("character")).select($("character"), $("count").sum().as("character_count"));tableEnv.toRetractStream(result, Row.class).print();

  1. 使用 Flink SQL 操作数据流:
    - 将 table 注册为临时视图 “CC”。
    - 执行 SQL 查询,对 “CC” 进行分组,计算每个字符的计数总和,并将结果存储在 result2 中。
        // 6. 使用FlinkSql操作数据流,并输出结果tableEnv.createTemporaryView("CC", table);Table result2 = tableEnv.sqlQuery("SELECT `character`, SUM(`count`) FROM CC group by `character`");tableEnv.toRetractStream(result2, Row.class).print();

  1. 执行任务:
    - 使用 env.execute(“Flink Sql Test”) 启动 Flink 作业,处理数据流并输出结果。
        // 7.执行任务env.execute("Flink Sql Test");

  1. 执行结果:
(true,+I[h, 1])
(true,+I[e, 1])
(true,+I[l, 1])
(false,-U[l, 1])
(true,+U[l, 2])
(true,+I[o, 1])
(true,+I[w, 1])
(false,-U[o, 1])
(true,+U[o, 2])
(true,+I[r, 1])
(false,-U[l, 2])
(true,+U[l, 3])
(true,+I[d, 1])
(true,+I[!, 1])
(false,-U[!, 1])
(true,+U[!, 2])
(false,-U[!, 2])
(true,+U[!, 3])Process finished with exit code 0

通过这段代码,您可以了解如何使用 Flink Table API 和 Flink SQL 对数据流进行简单的处理和分析,包括数据拆分、选择、过滤、分组和计算。最后,通过 toRetractStream 方法将结果打印输出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/690724.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SICTF Round#3 wp web

web hacker sql无列名注入&#xff1b; 提示查询username参数&#xff0c;flag在flag表中&#xff1b; 传参测试发现&#xff0c;union select 可用&#xff0c;空格被过滤可以使用/**/代替 &#xff0c;or也被过滤了且无法大小写、双写等绕过&#xff0c;导致无法查询flag表…

微信小程序swiper 视频中间大,两边小,轮播滑到中间视频自动播放组件教程

静态效果&#xff1a; 进入下面小程序可以体验效果&#xff0c;点击底部 看剧 栏目 一、创建小程序组件 二、代码 1、WXML <view class"swiper-wrapper" style"background-image:url(/asset/image/hot-banner.jpg);background-size: 100% 100%;">…

正整数A+B(PTA团体天题练习题)细节题刨析

哎呀&#xff0c;又是看似简单的AB模型&#xff0c;这题确实也是AB&#xff0c;不过这个题让我debug1个多小时才找出来问题所在&#xff0c;服了&#xff0c;真是所谓细节决定成败&#xff0c;这题也挺值得记录下来的&#xff0c;话不多嗦&#xff0c;看题 题的目标很简单&…

RK3568平台开发系列讲解(Linux系统篇)内核中断机制

🚀返回专栏总目录 文章目录 一、注册中断处理程序二、下半部的概念2.1、Tasklet作为下半部2.2、工作队列作为下半部2.3、Softirq作为下半部沉淀、分享、成长,让自己和他人都能有所收获!😄 📢中断是设备中止内核的一种方法,告诉内核发生了有趣或重要的事情。这些在Linu…

docker环境常用容器安装

目录 1.安装partainer 2.安装myql 3.安装redis 4.安装Minio 5.安装zibkin 6.安装nacos 7.安装RabbitMq 8.安装RocketMq 8.1启动service 8.2修改对应配置 8.3启动broker 8.4启动控制台 9.安装sentinel 10.安装elasticsearch 11.安装Kibana 12.安装logstash/file…

《UE5_C++多人TPS完整教程》学习笔记14 ——《P15 创建我们自己的子系统(Creating Our Own Subsystem)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P15 创建我们自己的子系统&#xff08;Creating Our Own Subsystem&#xff09;》 的学习笔记&#xff0c;该系列教学视频为 Udemy 课程 《Unreal Engine 5 C Multiplayer Shooter》 的中文字幕翻译版&#xff0c;UP主&…

three.js 3D可视化地图

threejs地图 可视化地图——three.js实现 this.provinceInfo document.getElementById(provinceInfo); // 渲染器 this.renderer new THREE.WebGLRenderer({antialias: true }); this.renderer.setSize(window.innerWidth, window.innerHeight); this.container.appendChild…

Linux下如何配置环境变量

在Linux下配置环境变量通常有几种方法&#xff0c;具体取决于你希望将环境变量设置为全局还是仅对当前会话有效。以下是一些常见的方法&#xff1a; 永久性全局配置&#xff1a;要使环境变量在所有用户和会话中永久生效&#xff0c;可以编辑 /etc/environment 文件。在文件中添…

Open CASCADE学习|TopoDS_Vertex与gp_Pnt相互转化

目录 gp_Pnt TopoDS_Vertex 关系和转换 使用场景 在Open CASCADE Technology (OCCT)中&#xff0c;TopoDS_Vertex和gp_Pnt是两种不同的数据类型&#xff0c;用于表示三维空间中的点。它们有不同的用途和特性&#xff1a; gp_Pnt gp_Pnt是OCCT几何库&#xff08;Geom&…

php伪协议之phar

一.phar协议 用于将多个 PHP 文件、类、库、资源&#xff08;如图像、样式表&#xff09;等打包成一个单独的文件。这个归档文件可以像其他 PHP 文件一样被包含&#xff08;include&#xff09;或执行。PHAR 归档提供了一种方便的方式来分发和安装 PHP 应用程序和库&#xff0c…

今日Java小练习

题目描述一 计算保存日期的那一年的所有天数 解题思路 分别截取日期的年月日保存为int型定义数组保存1到12月的天数&#xff08;或使用LocalDate的getDayOfMonth方法直接计算&#xff09;判断如果是闰年二月天数加1累加当前月份前所有月份的天数和用之前累加的天数加上当前月的…

ClickHouse学习

ClickHouse是由C编写的列式存储数据库&#xff08;DBMS&#xff09;&#xff0c;主要用来在线分析处理查询&#xff08;OLTP&#xff09;&#xff0c;能够用Sql查询生成的实时数据分析报告。 适用场景 大多数是读请求 、数据总是批量写入 、不更新或少更新数据、每次都是读取大…

机器学习中为什么需要梯度下降

在机器学习中&#xff0c;梯度下降是一种常用的优化算法&#xff0c;用于寻找损失函数的最小值。我们可以用一个简单的爬山场景来类比梯度下降的过程。 假设你被困在山上&#xff0c;需要找到一条通往山下的路。由于你是第一次来到这座山&#xff0c;对地形不熟悉&#xff0c;你…

Python——元组

一、元组特性介绍 元组和列表⼀样&#xff0c;也是⼀种序列类型的数据。 唯⼀的不同是&#xff0c;元组是相对不可变的。 二、⾼效创建元组 In [1]: t1 () # 创建 空 元素的元组In [2]: type(t1) Out[2]: tuple有元素的元组实际上是使⽤英⽂的逗号创建的 In [3]:…

【sgCreateTableColumn】自定义小工具:敏捷开发→自动化生成表格列html代码(表格列生成工具)[基于el-table-column]

源码 <template><!-- 前往https://blog.csdn.net/qq_37860634/article/details/136126479 查看使用说明 --><div :class"$options.name"><div class"sg-head">表格列生成工具</div><div class"sg-container"…

RSA加密,解密,加签及验签

目录 1.说明 2.加密和加签的区别 3.后端加密&#xff0c;解密&#xff0c;加签及验签示例 4.前端加密&#xff0c;解密&#xff0c;加签及验签示例 5.前端加密&#xff0c;后端解密&#xff0c;前端加签&#xff0c;后端验签 6.注意事项 1.说明 RSA算法是一种非对称加密…

【JavaScript】输入输出语法

目录 一、输出语法 二、输入语法 一、输出语法 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>D…

Java解决石子游戏问题

Java解决石子游戏问题 01 题目 Alice 和 Bob 用几堆石子在做游戏。一共有偶数堆石子&#xff0c;排成一行&#xff1b;每堆都有 正 整数颗石子&#xff0c;数目为 piles[i] 。 游戏以谁手中的石子最多来决出胜负。石子的 总数 是 奇数 &#xff0c;所以没有平局。 Alice 和…

Netty学习------2024/02/19

non-blocking io 非阻塞 IO 1. 三大组件 1.1 Channel & Buffer channel 有一点类似于 stream&#xff0c;它就是读写数据的双向通道&#xff0c; 可以从 channel 将数据读入 buffer&#xff0c;也可以将 buffer 的数据写入 channel&#xff0c; 而之前的 stream 要么是输…

【Linux系列】超算作业调度系统批量取消作业介绍

在使用HPC跑模型时常常需要批量取消提交的job&#xff0c;本文将三种常见的作业调度系统的批量取消作业方法进行介绍&#xff0c;方便平时使用。 一、Slurm Slurm取消/删除作业的命令为scancel&#xff0c;其基本的使用方法有&#xff1a; 命令说明scancel < jobid >删…