2023-大数据应用开发-工业数据实时处理-参考结果

工业数据实时处理-答案

任务一:实时数据采集

1、 在主节点使用Flume采集/data_log目录下实时日志文件中的数据,将数据存入到Kafka的Topic中(Topic名称分别为ChangeRecord、ProduceRecord和EnvironmentData,分区数为4),将Flume采集ChangeRecord主题的配置截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下;

flume配置内容

a1.sources = r1 r2 r3
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3
# r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/changerecord/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data_log/.*changerecord.csv
# r2
a1.sources.r2.type = TAILDIR
a1.sources.r2.positionFile = /opt/module/flume/producerecord/taildir_position.json
a1.sources.r2.filegroups = f1
a1.sources.r2.filegroups.f1 = /data_log/.*producerecord.csv
# r3
a1.sources.r3.type = TAILDIR
a1.sources.r3.positionFile = /opt/module/flume/environmentdata/taildir_position.json
a1.sources.r3.filegroups = f1
a1.sources.r3.filegroups.f1 = /data_log/.*environmentdata.csv
# k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ChangeRecord
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# k2
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = ProduceRecord
a1.sinks.k2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
# k3
a1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k3.kafka.topic = EnvironmentData
a1.sinks.k3.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k3.kafka.flumeBatchSize = 20
a1.sinks.k3.kafka.producer.acks = 1
a1.sinks.k3.kafka.producer.linger.ms = 1# c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000# c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 10000# c3
a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 10000a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2a1.sources.r3.channels = c3
a1.sinks.k3.channel = c3

2、 编写新的Flume配置文件,将数据备份到HDFS目录/user/test/flumebackup下,要求所有主题的数据使用同一个Flume配置文件完成,将Flume的配置截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下。

flume配置内容

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json
a1.sources.r1.filegroups = f1 f2 f3
#f1
a1.sources.r1.filegroups.f1 = /data_log/.*producerecord.csv
a1.sources.r1.headers.f1.headerKey1 = producerecord#f2
a1.sources.r1.filegroups.f2 = /data_log/.*changerecord.csv
a1.sources.r1.headers.f2.headerKey1 = changerecord
#f3
a1.sources.r1.filegroups.f3 = /data_log/.*environmentdata.csv
a1.sources.r1.headers.f3.headerKey1 = environmentdata
a1.sources.r1.fileHeader = true# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs:///user/test/flumebackup/%Y%m%d/%H/%{headerKey1}
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

任务二:使用Flink处理Kafka中的数据

编写Scala工程代码,使用Flink消费Kafka中的数据并进行相应的数据统计计算。

ProduceRecord

totalproduce

1、 使用Flink消费Kafka中ProduceRecord主题的数据,统计在已经检验的产品中,各设备每5分钟生产产品总数,将结果存入Redis中,key值为“totalproduce”,value值为“设备id,最近五分钟生产总数”。使用redis cli以HGETALL key方式获取totalproduce值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔5分钟以上,第一次截图放前面,第二次截图放后面;

注:ProduceRecord主题,生产一个产品产生一条数据;

change_handle_state字段为1代表已经检验,0代表未检验;

时间语义使用Processing Time。

第一次截图

在这里插入图片描述

第二次截图

在这里插入图片描述

Produce5minAgg

2、 使用Flink消费Kafka中ProduceRecord主题的数据,统计在已经检验的产品中,各设备每5分钟生产产品总数,将结果存入HBase中的gyflinkresult:Produce5minAgg表,rowkey“设备id-系统时间”(如:123-2023-01-01 12:06:06.001),将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔5分钟以上,第一次截图放前面,第二次截图放后面;

注:ProduceRecord主题,每生产一个产品产生一条数据;

change_handle_state字段为1代表已经检验,0代表为检验;

时间语义使用Processing Time。

字段类型中文含义
rowkeyString设备id-系统时间(如:123-2023-01-01 12:06:06.001)
machine_idString设备id
total_produceString最近5分钟生产总数

第一次截图

在这里插入图片描述

第二次截图

在这里插入图片描述

ChangeRecord

warning30sMachine

1、 使用Flink消费Kafka中ChangeRecord主题的数据,当某设备30秒状态连续为“预警”,输出预警信息。当前预警信息输出后,最近30秒不再重复预警(即如果连续1分钟状态都为“预警”只输出两次预警信息),将结果存入Redis中,key值为“warning30sMachine”,value值为“设备id,预警信息”。使用redis cli以HGETALL key方式获取warning30sMachine值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;

注:时间使用change_start_time字段,忽略数据中的change_end_time不参与任何计算。忽略数据迟到问题。

Redis的value示例:115,2022-01-01 09:53:10:设备115 连续30秒为预警状态请尽快处理!

(2022-01-01 09:53:10 为change_start_time字段值,中文内容及格式必须为示例所示内容。)

截图一

在这里插入图片描述

截图二

在这里插入图片描述

warning_last3min_everymin_out

2、 使用Flink消费Kafka中ChangeRecord主题的数据,每隔1分钟输出最近3分钟的预警次数最多的设备,将结果存入Redis中,key值为“warning_last3min_everymin_out”,value值为“窗口结束时间,设备id”(窗口结束时间格式:yyyy-MM-dd HH:mm:ss)。使用redis cli以HGETALL key方式获取warning_last3min_everymin_out值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面。

注:时间语义使用Processing Time。

第一次截图

在这里插入图片描述

第二次截图

在这里插入图片描述

threemin_warning_state_agg

3、 使用Flink消费Kafka中ChangeRecord主题的数据,统计每3分钟各设备状态为“预警”且未处理的数据总数,将结果存入MySQL数据库shtd_industry的threemin_warning_state_agg表中(追加写入,表结构如下)。请将任务启动命令复制粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,启动且数据进入后按照设备id升序排序查询threemin_warning_state_agg表进行截图,第一次截图后等待3分钟再次查询并截图,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下。

threemin_warning_state_agg表:

字段类型中文含义
change_machine_idint设备id
totalwarningint未被处理预警的数据总数
window_end_timevarchar窗口结束时间(yyyy-MM-dd HH:mm:ss)

注:时间语义使用Processing Time。

第一次截图

在这里插入图片描述

第二次截图

在这里插入图片描述

change_state_other_to_run_agg

4、 使用Flink消费Kafka中ChangeRecord主题的数据,实时统计每个设备从其他状态转变为“运行”状态的总次数,将结果存入MySQL数据库shtd_industry的change_state_other_to_run_agg表中(表结构如下)。请将任务启动命令复制粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,启动1分钟后根据change_machine_id降序查询change_state_other_to_run_agg表并截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,启动2分钟后根据change_machine_id降序查询change_state_other_to_run_agg表并再次截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下;

注:时间语义使用Processing Time。

change_state_other_to_run_agg表:

字段类型中文含义
change_machine_idint设备id
last_machine_statevarchar上一状态。即触发本次统计的最近一次非运行状态
total_change_torunint从其他状态转为运行的总次数
in_timevarcharflink计算完成时间(yyyy-MM-dd HH:mm:ss)

第一次截图

在这里插入图片描述

第二次截图

在这里插入图片描述

EnvironmentData

env_temperature_monitor

1、 使用Flink消费Kafka中EnvironmentData主题的数据,监控各环境检测设备数据,当温度(Temperature字段)持续3分钟高于38度时记录为预警数据。将结果存入Redis中,key值为“env_temperature_monitor”,value值为“设备id-预警信息生成时间,预警信息”(预警信息生成时间格式:yyyy-MM-dd HH:mm:ss)。使用redis cli以HGETALL key方式获取env_temperature_monitor值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需要Flink启动运行6分钟以后再截图;

注:时间语义使用Processing Time。

value示例:114-2022-01-01 14:12:19,设备114连续三分钟温度高于38度请及时处理!

中文内容及格式必须为示例所示内容。

同一设备3分钟只预警一次。

第一次截图

在这里插入图片描述

env_temperature_monitor

2、 使用Flink消费Kafka中EnvironmentData主题的数据,监控各环境检测设备数据,当温度(Temperature字段)持续3分钟高于38度时记录为预警数据。将结果存入HBase中的gyflinkresult:EnvTemperatureMonitor,key值为“env_temperature_monitor”,rowkey“设备id-系统时间”(如:123-2023-01-01 12:06:06.001),将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需要Flink启动运行6分钟以后再截图;

注:时间语义使用Processing Time。

中文内容及格式必须为示例所示内容。

同一设备3分钟只预警一次。

字段类型中文含义
rowkeyString设备id-系统时间(如:123-2023-01-01 12:06:06.001)
machine_idString设备id
out_warning_timeString预警生成时间 预警信息生成时间格式:yyyy-MM-dd HH:mm:ss

截图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72806.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue响应式详解

1. 响应式的定义 我们都知道,vue是基于javascript的,那我们使用一段javascript代码来描述响应式 let a 1,b 1,c; c a b; console.log(c) // 输出 2 // 改变 a的值 a 3; // 重新给c赋值 即把 c a b; 再执行一遍c的值才能变为 4 // c a b; // …

基于SpringBoot的无忌在线考试系统(源码+讲解+调试运行)做毕设课设均可

技术栈 前后端分离 前端使用: Vue Element Plus 后端使用: SpringBoot Mysql8.0 Mybatis-Plus 功能 分为 管理员端 和 老师端 和 学生端 管理员端 登陆页 ​科目管理 查看所有科目 ,增加 ,修改 ,删除科目 , 模糊搜索课程 ​考试管理 查看所有考试 ,增加 ,修改 ,删除考试 题库…

小白学go基础05-变量声明形式

和Python、Ruby等动态脚本语言不同,Go语言沿袭了静态编译型语言的传统:使用变量之前需要先进行变量的声明。 变量声明形式使用决策流程图 这里大致列一下Go语言常见的变量声明形式: var a int32 var s string "hello" var i 13 …

【RabbitMQ】RabbitMQ 服务无法启动。系统出错。发生系统错误 1067。进程意外终止。

问题描述 RabbitMQ 服务无法启动。 rabbitmq-service.bat startRabbitMQ 服务正在启动 . RabbitMQ 服务无法启动。系统出错。发生系统错误 1067。进程意外终止。原因分析 RabbitMQ和Erlang版本不匹配。 解决方案 查询并安装RabbitMQ版本对应Erlang版本 https://www.rabbitm…

如何指定this

<script>/*如何指定this的值可以通过2类方法指定1.调用时指定1.1call方法1.2apply方法2.创建时指定2.1bind方法2.2箭头函数*/// ------1.调用时指定------//1.1call方法:挨个传入参数//1.2apply方法:数组形式传入参数function foo (numA, numB) {console.log(this)consol…

自动化测试基础知识详解

前言 有颜色的标注主要是方便记忆&#xff0c;勾选出个人感觉的重点 块引用&#xff1a;大部分是便于理解的话&#xff0c;稍微看看就行&#xff0c;主要是和正常的文字进行区分的 1、什么是自动化测试 自动化测试是软件测试活动中一个重要分支和组成部分&#xff0c;随着软…

一加11/Ace2/10Pro手机如何实现全局120HZ高刷-游戏超级流畅效果

已经成功root啦。安卓13目前也一样支持LSPosed框架&#xff0c;如果你对LSP框架有需求&#xff0c;也可以使 自测120HZ刷新率诞生以后&#xff0c;很多小伙伴用上了就很难回来啦&#xff0c;一加11/Ace2/10Pro/9pro手 机厂商也对新机做了很多的适配&#xff0c;让我们日常使用起…

nvm管理多个版本的nodejs

1. 已经安装过nodejs在安装nvm的步骤 1.安装nvmhttps://github.com/coreybutler/nvm-windows/releases 2.nvm安装位置 2.nvm管理的nodejs安装位置 4.最终的安装结构 备注&#xff1a;nodejs安装 2.使用nvm安装管理nodejs 2.1配置下载镜像&#xff1a; 找到nvm安装路径…

2023-9-8 满足条件的01序列

题目链接&#xff1a;满足条件的01序列 #include <iostream> #include <algorithm>using namespace std;typedef long long LL;const int mod 1e9 7;int qmi(int a, int k, int p) {int res 1;while(k){if(k & 1) res (LL) res * a % p;a (LL) a * a % p;…

公式trick备忘录

增大不同class feature之间的距离用hinge loss 相关&#xff0c; similarity learning, svm https://www.youtube.com/watch?vQtAYgtBnhws https://www.youtube.com/watch?vbM4_AstaBZo&t286s

java特殊文件 属性文件properties和XML文件

属性文件properties 后缀为.properties的文件&#xff0c;称之为属性文件&#xff0c;它可以很方便的存储一些类似于键值对的数据。经常当做软件的配置文件使用。 首先我们要掌握属性文件的格式&#xff1a; 1.属性文件后缀以.properties结尾 2.属性文件里面的每一行都是一个…

【笔试强训选择题】Day37.习题(错题)解析

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;笔试强训选择题 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01; 文章目录 前言一、Day…

计算机毕设之基于python+django+mysql数据可视化的智慧社区内网平台(包含文档+源码+部署教程)

系统阐述的是一款基于数据可视化的智慧社区内网平台的设计与实现&#xff0c;对于Python、B/S结构、MySql进行了较为深入的学习与应用。主要针对系统的设计&#xff0c;描述&#xff0c;实现和分析与测试方面来表明开发的过程。开发中使用了 django框架和MySql数据库技术搭建系…

大文件上传demo,前端基于Uppy,后端基于koa

前言 文件上传基本上所有的管理系统之类的项目都有这么一个功能。因为使用了Element&#xff0c;可以方便的使用 其提供的Upload组件&#xff0c;对于普通上传来说基本上就够用了。但是有时候会涉及到大文件上传的需求&#xff0c;这时就会面临一些问题&#xff1a;比如文件上…

stable diffusion实践操作-批次出图

系列文章目录 stable diffusion实践操作 文章目录 系列文章目录前言一、批次出图介绍1.1 webUI设置1.2 参数介绍 二、批次出图使用2.1 如何设置2.1 效果展示 总结 前言 本章主要介绍SD批次出图。 想要一次产生多张图片的时候使用。 一、批次出图介绍 1.1 webUI设置 1.2 参数…

IP应用场景查询API:深入了解网络用户行为的利器

前言 随着数字时代的不断发展&#xff0c;互联网已经成为人们生活的重要组成部分。而随着越来越多的业务和社交活动迁移到在线平台上&#xff0c;了解和理解网络用户行为变得至关重要。为了满足这个需求&#xff0c;IP 应用场景查询 API 崭露头角&#xff0c;成为深入了解网络…

RabbitMQ: return机制

1. Return机制 Confirm只能保证消息到达exchange&#xff0c;无法保证消息可以被exchange分发到指定queue。 而且exchange是不能持久化消息的&#xff0c;queue是可以持久化消息。 采用Return机制来监听消息是否从exchange送到了指定的queue中 2.Java的实现方式 1.导入依赖 &l…

SQL函数

函数 字符串函数数值函数日期函数流程函数 字符串函数 常用函数&#xff1a; 函数功能CONCAT(s1, s2, …, sn)字符串拼接&#xff0c;将s1, s2, …, sn拼接成一个字符串LOWER(str)将字符串全部转为小写UPPER(str)将字符串全部转为大写LPAD(str, n, pad)左填充&#xff0c;用…

git ------ IDEA中建立本地/远程仓库及上传

目录 建立本地仓库 1. idea中选择创建本地仓库 选择目标文件 创建远程仓库 1.码云上进行库创建 将本地仓库数据提交到远程仓库 提交代码 推送到远程 建立本地仓库 1. idea中选择创建本地仓库 或 vsm中找下列2 即可 选择目标文件 成功后会出现以下标识 更新 提交 推…

盖子的c++小课堂——第二十二讲:2维dp

前言 大家好&#xff0c;我又来更新了&#xff0c;今天终于有时间了aaaaaaaa 破500粉了&#xff0c;我太高兴了哈哈哈哈哈哈&#xff08;别看IP地址&#xff0c;我去北京旅游回来了&#xff0c;他没改回来&#xff09;&#xff0c;然后我马上就成为创作者一年了&#xff0c;希…