工业—使用Flink处理Kafka中的数据_ChangeRecord2

使用 Flink 消费 Kafka ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” value 值为 窗口结束时间,设备id” (窗口结束时间格式: yyyy-MM-dd HH:mm:ss )。使用 redis cli HGETALL key方式获取 warning_last3min_everymin_out值。
注:时间语义使用 Processing Time
  1. Kafka Source

    • 从 Kafka 中读取实时的设备预警数据,数据内容应当包括设备 ID 和预警状态等信息。
    • 数据通过 SimpleStringSchema 反序列化为字符串格式,再由 parseMessage 进行解析和提取。
  2. 流处理与窗口

    • Flink 使用滑动时间窗口 (SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) 来计算每 1 分钟内过去 3 分钟内的设备预警数据。
    • 这意味着每 1 分钟计算一次,在每次计算中,会考虑过去 3 分钟内的数据,因此具有滑动窗口的特点。
  3. 窗口函数

    • 在 MaxNumWarnMachineID 中,窗口内的数据按设备 ID 分组,统计每个设备的预警次数,并选出预警次数最多的设备 ID。
    • apply 方法处理窗口内的数据后,输出一个包含时间戳(窗口结束时间)和设备 ID 的元组。
  4. Redis Sink

    • 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为 HSET
    • Redis 中的键为 warning_last3min_everymin_out,值为设备 ID。

 

package flink.calculate.ChangeRecordimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable// 定义常量
object Constants {val TOPIC_NAME = "ChangeRecord"val BOOTSTRAP_SERVERS = "192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092"val REDIS_HOST = "192.168.222.101"
}// 主程序逻辑
object WarningLast3MinEveryMinOut {def main(args: Array[String]): Unit = {// 创建流执行环境并配置val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置作业并行度// 构建Kafka数据源val kafkaSource = buildKafkaSource()// 从Kafka读取数据并处理val dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME).map(parseMessage) // 解析消息为 (标识符, 设备ID, 状态).filter(_._3 == "预警") // 过滤非预警状态的数据.keyBy(_._1) // 按标识符分组.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑动窗口.apply(new MaxNumWarnMachineID) // 应用窗口函数计算每分钟内过去3分钟的最多预警设备// 输出到控制台和RedisdataStream.print("Result =>")dataStream.addSink(buildRedisSink())// 执行Flink作业env.execute("WarningLast3MinEveryMinOut Job")}// 构建Kafka数据源private def buildKafkaSource(): KafkaSource[String] = {KafkaSource.builder[String]().setTopics(Constants.TOPIC_NAME).setBootstrapServers(Constants.BOOTSTRAP_SERVERS).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build()}// 解析来自Kafka的消息为元组private def parseMessage(message: String): (String, String, String) = {val fields = message.split(",")("warning_last3min_everymin_out", fields(1), fields(3))}// 构建Redis Sinkprivate def buildRedisSink(): ConnRedis.RedisSink[(String, String)] = {new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)}
}// 预警设备计数窗口函数
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit = {// 统计每个设备ID的预警次数val machineCounts = input.groupBy(_._2).view.mapValues(_.size)// 获取窗口结束时间val windowEndTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getEnd))// 获取预警次数最多的设备IDif (machineCounts.nonEmpty) {val maxMachineId = machineCounts.maxBy(_._2)._1out.collect((windowEndTime, maxMachineId))}}
}// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning_last3min_everymin_out")override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/62953.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

汇编语言学习-二

好吧,已经隔了两天,下完班看了两天,在电脑上装了虚拟机版的MS_DOS,主要是怕折腾坏我的电脑系统; 这个第二天应该是称为第二章更为合适,目前第二章已经看完,基本的命令也是敲了敲; 下面就进行一…

等差数列末项计算

等差数列末项计算 C语言代码C 代码Java代码Python代码 💐The Begin💐点点关注,收藏不迷路💐 给出一个等差数列的前两项a1,a2,求第n项是多少。 输入 一行,包含三个整数a1,a2&#x…

【笔记2-1】ESP32:基于vscode的espidf插件的开发环境搭建

主要参考b站宸芯IOT老师的视频,记录自己的笔记,老师讲的主要是linux环境,但配置过程实在太多问题,就直接用windows环境了,老师也有讲一些windows的操作,只要代码会写,操作都还好,开发…

基于Java Springboot蛋糕订购小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA/eclipse 微信…

使用Postman搞定各种接口token实战

现在许多项目都使用jwt来实现用户登录和数据权限,校验过用户的用户名和密码后,会向用户响应一段经过加密的token,在这段token中可能储存了数据权限等,在后期的访问中,需要携带这段token,后台解析这段token才…

H3C OSPF实验

实验拓扑 实验需求 按照图示配置 IP 地址按照图示分区域配置 OSPF ,实现全网互通为了路由结构稳定,要求路由器使用环回口作为 Router-id,ABR 的环回口宣告进骨干区域 实验解法 一、配置IP地址 [R1]int l0 [R1-LoopBack0]ip add 1.1.1.1 32 […

LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型 Matlab代码注释清晰。 程序设计 完整程序和数据获取方式:私信博主回复LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型(Matlab&#…

Ai编程cursor + sealos + devBox实现登录以及用户管理增删改查(十三)

一、什么是 Sealos? Sealos 是一款以 Kubernetes 为内核的云操作系统发行版。它以云原生的方式,抛弃了传统的云计算架构,转向以 Kubernetes 为云内核的新架构,使企业能够像使用个人电脑一样简单地使用云。 二、适用场景 业务运…

CSS学习记录02

CSS颜色 指定颜色是通过使用预定义的颜色名称&#xff0c;或RGB&#xff0c;HEX&#xff0c;HSL&#xff0c;RGBA&#xff0c;HSLA值。 CSS颜色名 在CSS中&#xff0c;可以使用颜色名称来指定颜色&#xff1a; CSS背景色 您可以为HTML元素设置背景色&#xff1a; <h1 s…

【VUE3】npm : 无法加载文件 D:\Program\nodejs\node_global\npm.ps1,因为在此系统上禁止运行脚本。

npm : 无法加载文件 D:\Program\nodejs\npm.ps1。未对文件 D:\Program\nodejs\npm.ps1 进行数字签名。无法在当前系统上运行该脚本。有关运行脚本和设置执行策略的详细信息&#xff0c;请参阅 https:/go.microsoft.com/fwlink/?LinkID135170 中的 about_ Execution_Policies。…

级联树结构TreeSelect和上级反查

接口返回结构 前端展示格式 前端组件 <template><div ><el-scrollbar height"70vh"><el-tree :data"deptOptions" :props"{ label: label, children: children }" :expand-on-click-node"false":filter-node-me…

Ansible自动化一键部署单节点集群架构

自动化部署利器&#xff1a;Ansible 一键部署脚本 在现代IT基础设施管理中&#xff0c;Ansible以其简洁、强大的自动化能力脱颖而出。以下是精心打造的Ansible自动化一键部署脚本&#xff0c;旨在简化部署流程&#xff0c;提升效率&#xff0c;确保一致性和可靠性。 通过这个…

基于智能语音交互的智能呼叫中心工作机制

在智能化和信息化不断进步的现代&#xff0c;智能呼叫中心为客户提供高质量、高效率的服务体验&#xff0c;提升众多品牌用户的满意度和忠诚度。作为实现智能呼叫中心的关键技术之一的智能语音交互技术&#xff0c;它通过集成自然语言处理&#xff08;NLP&#xff09;、语音识别…

CLIP模型也能处理点云信息

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

【开源免费】基于Vue和SpringBoot的服装生产管理系统(附论文)

博主说明&#xff1a;本文项目编号 T 066 &#xff0c;文末自助获取源码 \color{red}{T066&#xff0c;文末自助获取源码} T066&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析…

R语言机器学习论文(六):总结

文章目录 介绍参考文献介绍 本文采用R语言对来自进行数据描述、数据预处理、特征筛选和模型构建。 最后我们获得了一个能有效区分乳腺组织的随机森林预测模型,它的性能非常好,这意味着它可能拥有非常好的临床价值。 在本文中,我们利用R语言对来自美国加州大学欧文分校的B…

CSP/信奥赛C++语法基础刷题训练(36):洛谷P11229:[CSP-J 2024] 小木棍

CSP/信奥赛C语法基础刷题训练&#xff08;36&#xff09;&#xff1a;洛谷P11229&#xff1a;[CSP-J 2024] 小木棍 题目描述 小 S 喜欢收集小木棍。在收集了 n n n 根长度相等的小木棍之后&#xff0c;他闲来无事&#xff0c;便用它们拼起了数字。用小木棍拼每种数字的方法如…

http(请求方法,状态码,Cookie与)

目录 1.http中常见的Header(KV结构) 2.http请求方法 2.1 请求方法 2.2 telnet 2.3 网页根目录 2.3.1 概念 2.3.2 构建一个首页 2.4 GET与POST方法 2.4.1 提交参数 2.4.2 GET与POST提交参数对比 2.4.3 GET和POST对比 3.状态码 3.1 状态码分类 3.2 3XXX状态码 3.2 …

365天深度学习训练营-第P6周:VGG-16算法-Pytorch实现人脸识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 文为「365天深度学习训练营」内部文章 参考本文所写记录性文章&#xff0c;请在文章开头带上「&#x1f449;声明」 &#x1f37a;要求&#xff1a; 保存训练过…

【Linux】设计文件系统(C实现)

要求&#xff1a; (1)可以实现下列几条命令 dir 列文件目录 create 创建文件 delete 删除文件 read 读文件 write 写文件 (2)列目录时要列出文件名、存取权限&#xff08;八进制&#xff09;、文件长度、时间&#xff08;创建时间&#xff0c;修改时间以及…