二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)

一、目的

由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。

而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中

二 、原始数据格式

JSON格式比较正常,对象中包含数组

{
    "deviceNo": "39",
    "sourceDeviceType": null,
    "sn": null,
    "model": null,
    "createTime": "2024-09-03 14:10:00",
    "data": {
        "cycle": 300,
        "evaluationList": [{
            "laneNo": 1,
            "laneType": null,
            "volume": 3,
            "queueLenMax": 11.43,
            "sampleNum": 0,
            "stopAvg": 0.54,
            "delayAvg": 0.0,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 2,
            "laneType": null,
            "volume": 7,
            "queueLenMax": 23.18,
            "sampleNum": 0,
            "stopAvg": 0.47,
            "delayAvg": 10.57,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 3,
            "laneType": null,
            "volume": 9,
            "queueLenMax": 11.54,
            "sampleNum": 0,
            "stopAvg": 0.18,
            "delayAvg": 9.67,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 4,
            "laneType": null,
            "volume": 6,
            "queueLenMax": 11.36,
            "sampleNum": 0,
            "stopAvg": 0.27,
            "delayAvg": 6.83,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        }]
    }
}

三、Java代码

package com.kgc;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaKafkaEvaluation {// 添加 Kafka Producer 配置private static Properties producerProps() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.ACKS_CONFIG, "-1");props.put(ProducerConfig.RETRIES_CONFIG, "3");props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");props.put(ProducerConfig.LINGER_MS_CONFIG, "1");props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");return props;}public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 每一个消费,都要定义不同的Group_IDprop.put(ConsumerConfig.GROUP_ID_CONFIG, "evaluation_group");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton("topic_internal_data_evaluation"));ObjectMapper mapper = new ObjectMapper();// 初始化 Kafka ProducerKafkaProducer<String, String> producer = new KafkaProducer<>(producerProps());while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {try {JsonNode rootNode = mapper.readTree(record.value());System.out.println("原始数据"+rootNode);String device_no = rootNode.get("deviceNo").asText();String source_device_type = rootNode.get("sourceDeviceType").asText();String sn = rootNode.get("sn").asText();String model = rootNode.get("model").asText();String create_time = rootNode.get("createTime").asText();String cycle = rootNode.get("data").get("cycle").asText();JsonNode evaluationList = rootNode.get("data").get("evaluationList");for (JsonNode evaluationItem : evaluationList) {String lane_no = evaluationItem.get("laneNo").asText();String lane_type = evaluationItem.get("laneType").asText();String volume = evaluationItem.get("volume").asText();String queue_len_max = evaluationItem.get("queueLenMax").asText();String sample_num = evaluationItem.get("sampleNum").asText();String stop_avg = evaluationItem.get("stopAvg").asText();String delay_avg = evaluationItem.get("delayAvg").asText();String pass_rate = evaluationItem.get("passRate").asText();String travel_dist = evaluationItem.get("travelDist").asText();String travel_time_avg = evaluationItem.get("travelTimeAvg").asText();String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s",device_no, source_device_type, sn, model, create_time, cycle,lane_no, lane_type,volume,queue_len_max,sample_num,stop_avg,delay_avg,pass_rate,travel_dist,travel_time_avg);// 发送数据到 KafkaProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_evaluation", record.key(), outputLine);producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> {if (e != null) {e.printStackTrace();} else {System.out.println("The offset of the record we just sent is: " + metadata.offset());}});}} catch (Exception e) {e.printStackTrace();}}consumer.commitAsync();}}}

1、服务器IP都是   192.168.0.70

2、消费Kafka主题(数据源):topic_internal_data_evaluation

3、生产Kafka主题(目标源):topic_db_data_evaluation

4、注意:字段顺序与ODS层表结构字段顺序一致!!!

四、开启Kafka主题topic_db_data_evaluation消费者

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.70:9092  --topic topic_db_data_evaluation  --from-beginning

五、运行测试

1、启动项目

2、消费者输出数据

然后再用Flume采集写入HDFS就行了,不过ODS层表结构需要转变

六、ODS层新表结构

create external table  if not exists  hurys_dc_ods.ods_evaluation(device_no           string        COMMENT '设备编号',source_device_type  string        COMMENT '设备类型',sn                  string        COMMENT '设备序列号 ',model               string        COMMENT '设备型号',create_time         timestamp     COMMENT '创建时间',cycle               int           COMMENT '评价数据周期',lane_no             int           COMMENT '车道编号',lane_type           int           COMMENT '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',volume              int           COMMENT '车道内过停止线流量(辆)',queue_len_max       float         COMMENT '车道内最大排队长度(m)',sample_num          int           COMMENT '评价数据计算样本量',stop_avg            float         COMMENT '车道内平均停车次数(次)',delay_avg           float         COMMENT '车道内平均延误时间(s)',pass_rate           float         COMMENT '车道内一次通过率',travel_dist         float         COMMENT '车道内检测行程距离(m)',travel_time_avg     float         COMMENT '车道内平均行程时间'
)
comment '评价数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by ','
stored as SequenceFile
;

七、Flume采集配置文件

八、运行Flume任务,检查HDFS文件、以及ODS表数据

--刷新表分区
msck repair table ods_evaluation;
--查看表分区
show partitions hurys_dc_ods.ods_evaluation;
--查看表数据
select * from hurys_dc_ods.ods_evaluation
where day='2024-09-03';

搞定,这样就不需要在Hive中解析JSON数据了!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/51899.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javascript数据结构与算法-- 二叉树

javascript数据结构与算法-- 二叉树 树是计算机科学中经常用到的一种数据结构。树是一种非线性的数据结构&#xff0c;以分成的方式存储数据&#xff0c;树被用来存储具有层级关系的数据&#xff0c;比如文件系统的文件&#xff0c;树还被用来存储有序列表。我们要研究的是二叉…

IObit Uninstaller Pro v13.6.0.5 绿色便携免安装版本 下载

功能非常强大好用的软件卸载清理工具 下载地址(资源制作整理不易&#xff0c;下载使用需付费&#xff0c;不能接受请勿浪费时间下载) 链接&#xff1a;https://pan.baidu.com/s/1I7lbixooii9ezSrp3X-y-w?pwd716l 提取码&#xff1a;716l

d3dcompiler_47.dll缺失的可能原因多种多样,那么d3dcompiler_47.dll缺失怎么修复

在数字世界的深处&#xff0c;d3dcompiler_47.dll文件扮演着至关重要的角色&#xff0c;它是Direct3D编译器的一部分&#xff0c;负责处理图形渲染和游戏运行中的关键任务。然而&#xff0c;当用户启动某个程序或游戏时&#xff0c;屏幕上突然弹出的错误提示“d3dcompiler_47.d…

苹果手机勿扰模式怎么关闭?4个方法快速关闭!

我们为了提升做事的效率以及保障休息的质量&#xff0c;在认真工作和学习&#xff0c;或者是晚上休息的时候&#xff0c;通常会打开苹果手机的勿扰模式。但当我们需要恢复苹果手机的消息通知时&#xff0c;苹果手机勿扰模式怎么关闭呢&#xff1f;今天&#xff0c;小编整理了4个…

机械学习—零基础学习日志(概率论总笔记2)

正态分布 高斯分布也叫做正态分布。假定事件A经过n次试验后发生了k次&#xff0c;把k的概率分布图画一下&#xff0c;就得到了一个中间鼓起&#xff0c;像倒扣的钟一样的对称图形。 18世纪&#xff0c;数学家棣莫弗和拉普拉斯把这种中间大&#xff0c;两头小的分布称为正态分布…

厨师帽佩戴识别摄像机

厨师帽佩戴识别摄像机 是一种用于识别厨师是否佩戴帽子的智能设备&#xff0c;其作用在于强制执行食品安全卫生标准&#xff0c;防止头发掉落入食物中。该摄像机利用人工智能和图像识别技术&#xff0c;能够识别厨师是否佩戴厨师帽。当摄像机检测到厨师未佩戴帽子时&#xff0c…

微信小程序中Towxml解析Markdown及html

一、Towxml Towxml 是一个让小程序可以解析Markdown、HTML的解析库。 二、引入 2.1 clone代码 git clone https://github.com/sbfkcel/towxml.git2.2 安装依赖 npm install2.3 打包 npm run build2.4 引入文件 将dist文件复制到微信小程序根目录&#xff0c;改名为towx…

Flutter中的Key

在Flutter 中&#xff0c;Key 是 几乎所有 widget 都具有的属性。为什么 widget 具有 Key 呢&#xff1f;Key的作用是什么&#xff1f; 什么是 Key Key是Widget、Element 和 SemanticNodes 的标识符。 Key 是Widget、Element 和 SemanticNodes的唯一标识。例如对于 Widget 在 …

数据结构之 “单链表“

&#xff08;1&#xff09;在顺表表中&#xff0c;如果是头插/删的时间复杂度是O(1)&#xff1b;尾插/删的时间复杂度是O(N) &#xff08;2&#xff09;增容一般是呈2倍的增长&#xff0c;势必会有一定的空间浪费。比如&#xff1a;申请了50个空间&#xff0c;只用了两个&#…

Type-C接口诱骗取电快充方案

Type-C XSP08Q 快充协议芯片是一种新型电源管理芯片&#xff0c;主要负责控制充电电流和电压等相关参数&#xff0c;从而实现快速充电功能。Type-C XSP08Q快充协议是在Type-C接口基础上&#xff0c;加入了XSP08Q协议芯片的支持&#xff0c;很大程度上提升了充电速度。 正常情况…

Linux——性能调优工具一览

一、CPU 1.调优工具 根据指标找工具 性能指标工具说明 平均负载 uptime、top uptime最简单、top提供了更全的指标 系统整体CPU使用率 vmstat、mpstat、top、sar、/proc/stat top、vmstat、mpstat只可以动态查看&#xff0c;而sar还可以记录历史数据 /proc/stat是其他性…

UE引擎内置插件信息 储存的位置

.uproject。图标文件可以让UE 引擎内置插件&#xff0c;配置更改,比如我希望我的DataSmithImporter插件是启用的。

STM32 ADC采样详解

Content 0x00 前言0x01 ADC配置0x02 滤波处理 0x00 前言 在单片机开发过程中&#xff0c;常常涉及到ADC的使用&#xff0c;市面上大部分便宜的传感器都是采用的ADC来获取其数据&#xff0c;如MQ-2 烟雾传感器、光敏传感器等等。 此类传感器工作原理为根据所采集到的数据变化…

大模型入门 ch01:大模型概述

本文是github上的大模型教程LLMs-from-scratch的学习笔记&#xff0c;教程地址&#xff1a;教程链接 STAGE 1&#xff1a; BUILDING 1. 数据准备与采样 LLM的预测过程&#xff0c;是一个不断预测下一个词&#xff08;准确的说是token&#xff09;的过程&#xff0c;每次根据输…

【C++八股题整理】内存布局、堆和栈、内存泄露、函数调用栈

C八股题整理 内存布局C中的内存分配情况堆和栈的内存有什么区别&#xff1f; 堆堆内存分配慢如何优化&#xff1f;内存池内存溢出和内存泄漏是什么&#xff1f;如何避免&#xff1f;内存碎片是什么&#xff1f;怎么解决&#xff1f; 栈为什么栈的访问效率比堆高&#xff1f;函数…

UI自动化测试 —— web端元素获取元素等待实践!

前言 Web UI自动化测试是一种软件测试方法&#xff0c;通过模拟用户行为&#xff0c;自动执行Web界面的各种操作&#xff0c;并验证操作结果是否符合预期&#xff0c;从而提高测试效率和准确性。 目的&#xff1a; 确保Web应用程序的界面在不同环境(如不同浏览器、操作系统)下…

【前缀和算法】--- 进阶题目赏析

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; 算法Journey 本篇我们来赏析前缀和算法的进阶题目。 &#x1f3e0; 和可被K整除的子数组 &#x1f4cc; 题目解析 和可被k整除的子数组 &#x1f4cc; …

记一次ssh伪终端修改为shell

问题 用户ssh进行连接后&#xff0c;默认为伪终端。 解决办法&#xff0c;可以先拿到终端shell&#xff0c;查看用户是否为/bin/bash&#xff1a; 不是/bin/bash&#xff0c;使用如下命令进行修改&#xff1a; chsh -s /bin/bash rootservice sshd restart

量化投资策略与技术学习PART1.1:量化选股之再谈多因子模型(二)

在上一个多因子模型中&#xff0c;我手动对各个因子进行了回测&#xff0c;但是数据结果并不是十分理想&#xff0c;难道基本面指标真的和股票走势关系不大么&#xff1f; 这里我还是准备再测试一下&#xff0c;策略如下&#xff1a; &#xff08;1&#xff09;首先我获取了一下…

codeforces Round 970 (Div. 3)(A-F)

文章目录 [Codeforces Round 970 (Div. 3)](https://codeforces.com/contest/2008)A-[Sakurakos Exam](https://codeforces.com/contest/2008/problem/A)B-[Square or Not](https://codeforces.com/contest/2008/problem/B)C-[Longest Good Array](https://codeforces.com/cont…