工业—使用Flink处理Kafka中的数据_ChangeRecord2

使用 Flink 消费 Kafka ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” value 值为 窗口结束时间,设备id” (窗口结束时间格式: yyyy-MM-dd HH:mm:ss )。使用 redis cli HGETALL key方式获取 warning_last3min_everymin_out值。
注:时间语义使用 Processing Time
  1. Kafka Source

    • 从 Kafka 中读取实时的设备预警数据,数据内容应当包括设备 ID 和预警状态等信息。
    • 数据通过 SimpleStringSchema 反序列化为字符串格式,再由 parseMessage 进行解析和提取。
  2. 流处理与窗口

    • Flink 使用滑动时间窗口 (SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) 来计算每 1 分钟内过去 3 分钟内的设备预警数据。
    • 这意味着每 1 分钟计算一次,在每次计算中,会考虑过去 3 分钟内的数据,因此具有滑动窗口的特点。
  3. 窗口函数

    • 在 MaxNumWarnMachineID 中,窗口内的数据按设备 ID 分组,统计每个设备的预警次数,并选出预警次数最多的设备 ID。
    • apply 方法处理窗口内的数据后,输出一个包含时间戳(窗口结束时间)和设备 ID 的元组。
  4. Redis Sink

    • 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为 HSET
    • Redis 中的键为 warning_last3min_everymin_out,值为设备 ID。

 

package flink.calculate.ChangeRecordimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable// 定义常量
object Constants {val TOPIC_NAME = "ChangeRecord"val BOOTSTRAP_SERVERS = "192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092"val REDIS_HOST = "192.168.222.101"
}// 主程序逻辑
object WarningLast3MinEveryMinOut {def main(args: Array[String]): Unit = {// 创建流执行环境并配置val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置作业并行度// 构建Kafka数据源val kafkaSource = buildKafkaSource()// 从Kafka读取数据并处理val dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME).map(parseMessage) // 解析消息为 (标识符, 设备ID, 状态).filter(_._3 == "预警") // 过滤非预警状态的数据.keyBy(_._1) // 按标识符分组.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑动窗口.apply(new MaxNumWarnMachineID) // 应用窗口函数计算每分钟内过去3分钟的最多预警设备// 输出到控制台和RedisdataStream.print("Result =>")dataStream.addSink(buildRedisSink())// 执行Flink作业env.execute("WarningLast3MinEveryMinOut Job")}// 构建Kafka数据源private def buildKafkaSource(): KafkaSource[String] = {KafkaSource.builder[String]().setTopics(Constants.TOPIC_NAME).setBootstrapServers(Constants.BOOTSTRAP_SERVERS).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build()}// 解析来自Kafka的消息为元组private def parseMessage(message: String): (String, String, String) = {val fields = message.split(",")("warning_last3min_everymin_out", fields(1), fields(3))}// 构建Redis Sinkprivate def buildRedisSink(): ConnRedis.RedisSink[(String, String)] = {new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)}
}// 预警设备计数窗口函数
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit = {// 统计每个设备ID的预警次数val machineCounts = input.groupBy(_._2).view.mapValues(_.size)// 获取窗口结束时间val windowEndTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getEnd))// 获取预警次数最多的设备IDif (machineCounts.nonEmpty) {val maxMachineId = machineCounts.maxBy(_._2)._1out.collect((windowEndTime, maxMachineId))}}
}// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning_last3min_everymin_out")override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/62953.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣第90题:带重复元素的子集

力扣第90题:带重复元素的子集 问题描述 给定一个整数数组nums,该数组可能包含重复元素。返回该数组所有可能的子集(幂集),并且子集中的元素需要去重。返回的子集中的每个元素应按照非递减顺序排列。 例如&#xff0…

FFmpeg 4.3 音视频-多路H265监控录放C++开发十九,ffmpeg封装

封装就是将 一个h264,和一个aac文件重新封装成一个mp4文件。 这里我们的h264 和 aac都是来源于另一个mp4文件,也就是说,我们会将 in.mp4文件解封装成一路videoavstream 和 一路 audioavstream,然后 将这两路的 avstream 合并成一…

LVS默认的工作模式支持哪些负载均衡算法?

LVS默认的工作模式支持哪些负载均衡算法? LVS(Linux Virtual Server)默认支持多种负载均衡算法,这些算法在不同的场景下具有各自的优势。以下是 LVS 默认支持的负载均衡算法及其特点: 1. 轮询调度(Round Robin Sched…

汇编语言学习-二

好吧,已经隔了两天,下完班看了两天,在电脑上装了虚拟机版的MS_DOS,主要是怕折腾坏我的电脑系统; 这个第二天应该是称为第二章更为合适,目前第二章已经看完,基本的命令也是敲了敲; 下面就进行一…

等差数列末项计算

等差数列末项计算 C语言代码C 代码Java代码Python代码 💐The Begin💐点点关注,收藏不迷路💐 给出一个等差数列的前两项a1,a2,求第n项是多少。 输入 一行,包含三个整数a1,a2&#x…

【笔记2-1】ESP32:基于vscode的espidf插件的开发环境搭建

主要参考b站宸芯IOT老师的视频,记录自己的笔记,老师讲的主要是linux环境,但配置过程实在太多问题,就直接用windows环境了,老师也有讲一些windows的操作,只要代码会写,操作都还好,开发…

Redis设计与实现第17章 -- 集群 总结2(执行命令 重新分片)

17.3 在集群中执行命令 接收命令的节点会计算出命令要处理的数据库键属于哪个槽,并检查这个槽是否指派给了自己: 如果是的话,直接执行这个命令 否则,节点向客户端返回一个MOVED错误,指引客户端转向redirect至正确的节…

基于Java Springboot蛋糕订购小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA/eclipse 微信…

如何在GitHub上Clone项目:一步步指南

GitHub作为全球最大的代码托管平台,汇聚了无数开发者的智慧结晶。对于初学者和资深开发者来说,学会如何从GitHub上克隆(Clone)项目是一项基本且重要的技能。本文将详细介绍如何在GitHub上克隆项目的步骤,帮助你轻松将他…

使用Postman搞定各种接口token实战

现在许多项目都使用jwt来实现用户登录和数据权限,校验过用户的用户名和密码后,会向用户响应一段经过加密的token,在这段token中可能储存了数据权限等,在后期的访问中,需要携带这段token,后台解析这段token才…

脚本数据库操作 -- 查表、增加字段、备忘录

一、查询数据库中所有表 在MySQL中,您可以使用INFORMATION_SCHEMA数据库来查询数据库中所有表的列表。INFORMATION_SCHEMA是一个特殊的数据库,它包含了关于其他所有数据库的元数据。 以下是查询当前数据库中所有表的SQL语句: SELECT TABLE…

H3C OSPF实验

实验拓扑 实验需求 按照图示配置 IP 地址按照图示分区域配置 OSPF ,实现全网互通为了路由结构稳定,要求路由器使用环回口作为 Router-id,ABR 的环回口宣告进骨干区域 实验解法 一、配置IP地址 [R1]int l0 [R1-LoopBack0]ip add 1.1.1.1 32 […

LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型 Matlab代码注释清晰。 程序设计 完整程序和数据获取方式:私信博主回复LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型(Matlab&#…

flutter 报错 error: unable to find git in your path.

项目issue:WIndows: "Unable to find git in your PATH." if terminal is not in admin mode Issue #123995 flutter/flutter 解决办法, 方法一:每次想要运行flutter的时候以管理员方式运行,比如以管理方式运行vsco…

Ai编程cursor + sealos + devBox实现登录以及用户管理增删改查(十三)

一、什么是 Sealos? Sealos 是一款以 Kubernetes 为内核的云操作系统发行版。它以云原生的方式,抛弃了传统的云计算架构,转向以 Kubernetes 为云内核的新架构,使企业能够像使用个人电脑一样简单地使用云。 二、适用场景 业务运…

CSS学习记录02

CSS颜色 指定颜色是通过使用预定义的颜色名称&#xff0c;或RGB&#xff0c;HEX&#xff0c;HSL&#xff0c;RGBA&#xff0c;HSLA值。 CSS颜色名 在CSS中&#xff0c;可以使用颜色名称来指定颜色&#xff1a; CSS背景色 您可以为HTML元素设置背景色&#xff1a; <h1 s…

用micropython 操作stm32f4单片机实现串口通讯

from buzzer import Buzzer import pyb import machine # 导入 machine 模块以访问硬件功能 import time # 导入 time 模块以使用与时间相关的函数 from TOFSense import TOFSense_F #导入TOFSense_F板块 import binascii #二进制到 ASCII 的转换&#xff08;编码&#xff09…

postman中获取随机数、唯一ID、时间日期(包括当前日期增减)截取指定位数的字符等

在Postman中&#xff0c;您可以使用内置的动态变量和编写脚本的方式来获取随机数、唯一ID、时间日期以及截取指定位数的字符。以下是具体的操作方法&#xff1a; 一、postman中获取随机数、唯一ID、时间日期&#xff08;包括当前日期增减&#xff09;截取指定位数的字符等 获取…

《智能体雏形开发(高阶实操)》开发计划概述

智能体雏形开发计划 通过本计划,逐步完成一个可以真实运行的智能体雏形。 最终完成一个**“用户日志文件生成日报,日报再进一步汇总成周报”**的任务驱动型智能体雏形 第一阶段:基础准备与环境搭建 1. 学习基础知识 了解智能体的概念、类型和技术框架。学习大模型(如阿里…

【VUE3】npm : 无法加载文件 D:\Program\nodejs\node_global\npm.ps1,因为在此系统上禁止运行脚本。

npm : 无法加载文件 D:\Program\nodejs\npm.ps1。未对文件 D:\Program\nodejs\npm.ps1 进行数字签名。无法在当前系统上运行该脚本。有关运行脚本和设置执行策略的详细信息&#xff0c;请参阅 https:/go.microsoft.com/fwlink/?LinkID135170 中的 about_ Execution_Policies。…