二百六十、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(复杂JSON)

一、目的

由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。

而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中

二 、原始数据格式

JSON格式比较复杂,对象中包含数组,数组中包含对象

{
    "deviceNo": "39",
    "sourceDeviceType": null,
    "sn": null,
    "model": null,
    "createTime": "2024-09-03 14:40:00",
    "data": {
        "cycle": 300,
        "sectionList": [{
            "sectionNo": 1,
            "coilList": [{
                "laneNo": 1,
                "laneType": null,
                "coilNo": 1,
                "volumeSum": 3,
                "volumePerson": 0,
                "volumeCarNon": 0,
                "volumeCarSmall": 3,
                "volumeCarMiddle": 0,
                "volumeCarBig": 0,
                "speedAvg": 24.15,
                "timeOccupancy": 0.98,
                "averageHeadway": 162.36,
                "averageGap": 161.63,
                "speed85": 38.0
            },
            {
                "laneNo": 8,
                "laneType": null,
                "coilNo": 8,
                "volumeSum": 1,
                "volumePerson": 0,
                "volumeCarNon": 0,
                "volumeCarSmall": 1,
                "volumeCarMiddle": 0,
                "volumeCarBig": 0,
                "speedAvg": 49.43,
                "timeOccupancy": 0.3,
                "averageHeadway": 115.3,
                "averageGap": 115.1,
                "speed85": 49.43
            }]
        }]
    }
}

三、Java代码

package com.kgc;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaKafkaStatistics {// 添加 Kafka Producer 配置private static Properties producerProps() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.ACKS_CONFIG, "-1");props.put(ProducerConfig.RETRIES_CONFIG, "3");props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");props.put(ProducerConfig.LINGER_MS_CONFIG, "1");props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");return props;}public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 每一个消费,都要定义不同的Group_IDprop.put(ConsumerConfig.GROUP_ID_CONFIG, "statistics_group");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton("topic_internal_data_statistics"));ObjectMapper mapper = new ObjectMapper();// 初始化 Kafka ProducerKafkaProducer<String, String> producer = new KafkaProducer<>(producerProps());while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {try {JsonNode rootNode = mapper.readTree(record.value());System.out.println("原始数据"+rootNode);String device_no = rootNode.get("deviceNo").asText();String source_device_type = rootNode.get("sourceDeviceType").asText();String sn = rootNode.get("sn").asText();String model = rootNode.get("model").asText();String create_time = rootNode.get("createTime").asText();JsonNode dataNode = rootNode.get("data");String  cycle = dataNode.get("cycle").asText();for (JsonNode sectionStatus : dataNode.get("sectionList")) {String section_no = sectionStatus.get("sectionNo").asText();JsonNode coilList = sectionStatus.get("coilList");for (JsonNode coilItem : coilList) {String lane_no = coilItem.get("laneNo").asText();String lane_type = coilItem.get("laneType").asText();String coil_no = coilItem.get("coilNo").asText();String volume_sum = coilItem.get("volumeSum").asText();String volume_person = coilItem.get("volumePerson").asText();String volume_car_non = coilItem.get("volumeCarNon").asText();String volume_car_small = coilItem.get("volumeCarSmall").asText();String volume_car_middle = coilItem.get("volumeCarMiddle").asText();String volume_car_big = coilItem.get("volumeCarBig").asText();String speed_avg = coilItem.get("speedAvg").asText();String speed_85 = coilItem.get("speed85").asText();String time_occupancy = coilItem.get("timeOccupancy").asText();String average_headway = coilItem.get("averageHeadway").asText();String average_gap = coilItem.get("averageGap").asText();String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s",device_no, source_device_type, sn, model, create_time, cycle, lane_no, lane_type, section_no, coil_no, volume_sum, volume_person,volume_car_non, volume_car_small, volume_car_middle, volume_car_big, speed_avg, speed_85, time_occupancy, average_headway, average_gap);// 发送数据到 KafkaProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_statistics", record.key(), outputLine);producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> {if (e != null) {e.printStackTrace();} else {System.out.println("The offset of the record we just sent is: " + metadata.offset());}});}}} catch (Exception e) {e.printStackTrace();}}consumer.commitAsync();}}}

剩下的几步参考二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)这篇博客

四、开启Kafka主题消费者

五、运行测试

六、ODS层新表结构

七、Flume采集配置文件

八、运行Flume任务,检查HDFS文件、以及ODS表数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/51948.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SEO之网站结构优化(十四-内部链接及权重分配3)

初创企业搭建网站的朋友看1号文章&#xff1b;想学习云计算&#xff0c;怎么入门看2号文章谢谢支持&#xff1a; 1、我给不会敲代码又想搭建网站的人建议 2、“新手上云”能够为你开启探索云世界的第一步 博客&#xff1a;阿幸SEO~探索搜索排名之道 7、锚文字分布及变化 前面…

新手c语言讲解及题目分享(十四)--函数专项练习(一)

目录 前言 一.函数的定义 1.函数定义包括的内容&#xff1a; Ⅰ.指定函数类别 Ⅱ.指定函数类型 Ⅲ.指定函数名 Ⅳ.指定函数的参数名称和类型 Ⅴ.指定函数的函数体 2.函数定义的一般形式&#xff1a; Ⅰ.有参函数的定义形式&#xff1a; Ⅱ.无参函数的定义形式&#x…

C语言从头学55——学习头文件errno.h、float.h

1、头文件 errno.h 中的变量 errno 的使用 在 errno.h 定义了一个 int 类型的变量 errno&#xff08;错误码&#xff09;&#xff0c;如果发现这个变量出现非零值&#xff0c;表示已经执行的函数发生了错误。这个变量一般多用于检查数学函数运算过程中发生的错误。 …

部署 Web 项目到 Linux,可以使他人也访问项目的方法

目录 一、环境配置 二、建构项目并打包 三、上传Jar包到服务器, 并运行 3.1 上传Jar包 3.2 运行 jar 包 3.3 开放端口号 四、其他问题 4.1 运行异常问题 4.2 杀掉进程 五、总结 一、环境配置 如果本地项目是SpringBoot项目&#xff0c;使用的数据库是MySQL&#xff…

ES6 类-总结

我们现在用一段代码&#xff0c; 在注释中总结所有关于JavaScript类的所有用法 class Student extends Person {//这里的Student是子类&#xff0c;Person是父类&#xff0c;extends是实现类之间的继承&#xff0c;它可以自动设置原型university 家里蹲大学; //公共字段(类似…

APP 数据抓取 - Charles 抓包工具的使用(Charles 端口配置、CA 证书配置、Charles Android 模拟器配置)

前言说明 此文章是我在学习 Charles APP 抓包时编写&#xff0c;内容都是亲测有效&#xff0c;文章内容也有参考其他人&#xff0c;参考文章如下&#xff1a; Android 手机使用 charles 抓 https 请求&#xff08;保姆级教程&#xff09;网易 mumu 模拟器安装下载 charles 的…

计算机网络(八股文)

这里写目录标题 计算机网络一、网络分层模型1. TCP/IP四层架构和OSI七层架构⭐️⭐️⭐️⭐️⭐️2. 为什么网络要分层&#xff1f;⭐️⭐️⭐️3. 各层都有那些协议&#xff1f;⭐️⭐️⭐️⭐️ 二、HTTP【重要】1. http状态码&#xff1f;⭐️⭐️⭐️2. 从输入URL到页面展示…

XSLT 实例:掌握 XML 转换的艺术

XSLT 实例&#xff1a;掌握 XML 转换的艺术 引言 XSLT&#xff08;可扩展样式表语言转换&#xff09;是一种强大的工具&#xff0c;用于将 XML&#xff08;可扩展标记语言&#xff09;文档转换为其他格式&#xff0c;如 HTML、PDF 或纯文本。在本文中&#xff0c;我们将通过一…

从Vuex 到 Pinia,Vue 状态管理的进化

Vue.js,一个轻量级且易于上手的 JavaScript 框架,已经在全球范围内获得了广泛的应用。 Vue.js 的状态管理库 Vuex,也为开发者提供了一个统一的状态管理方案。然而,随著 Vue.js 的发展和进化,我们看到了一个新的状态管理库的诞生 — Pinia。在这篇文章中,我们将探讨 Vuex…

2024年9月3日嵌入式学习

数据结构 1定义 一组用来保存一种或者多种特定关系的数据的集合&#xff08;组织和存储数据&#xff09; 程序的设计&#xff1a;将现实中大量而复杂的问题以特定的数据类型和特定的存储结构存储在内存中&#xff0c; 并在此基础上实现某个特定的功能的操作&am…

Springboot集成WebSocket客户端,发送消息并监测心跳

jar包&#xff08;主要jar包&#xff09; <dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.5.7</version></dependency>服务类 import cn.hutool.json.JSONUtil; impor…

「Python程序设计」条件控制:if-elif-else语句

我们在进行程序设计的过程中&#xff0c;基本上遵循的过程是&#xff0c;找出变量和常量&#xff0c;通过python编程语言&#xff0c;设置变量和常量&#xff0c;以及考虑是否需要赋予初始值。 设计变量和常量&#xff0c;其实就是为了模拟和计算我们的现实世界中&#xff0c;…

学习笔记--Docker

安装 1.卸载旧版 首先如果系统中已经存在旧的Docker&#xff0c;则先卸载&#xff1a; yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine 2.配置Docker的yum库 首先要安…

深入理解 JavaScript DOM 操作

一、DOM 操作分类 &#xff08;一&#xff09;元素查找 根据 ID 值查找&#xff1a;getElementById()&#xff0c;返回符合条件的第一个对象。 var aa document.getElementById("aa");console.log(aa);根据类名查找&#xff1a;getElementsByClassName()&#xff…

IntelliJ IDEA 自定义字体大小

常用编程软件自定义字体大全首页 文章目录 前言具体操作1. 打开设置对话框2. 设置编辑器字体3. 设置编译软件整体字体 前言 IntelliJ IDEA 自定义字体大小&#xff0c;统一设置为 JetBrains Mono 具体操作 【File】>【Settings...】>【Editor】>【Font】 统一设置…

C++:list篇

前言: 观看C的list前需要对链表有一些了解&#xff0c;如C语言的链表结构。本片仅介绍list容器中常用的接口函数概念以及使用。 list的概念&#xff1a; 简而言之&#xff0c;C的list是一个双向带哨兵位的链表容器模板 list的构造&#xff1a; 1.list():默认构造 2.li…

spring之异常和测试相关注解

原文地址 ControllerAdvice和ExceptionHandler 通常组合使用&#xff0c;用于处理全局异常&#xff0c;示例代码如下&#xff1a; ControllerAdvice Configuration Slf4j public class GlobalExceptionConfig {private static final Integer GLOBAL_ERROR_CODE 500;Excepti…

认识git和git的基本使用,本地仓库,远程仓库和克隆远程仓库

本地仓库 #安装git https://git-scm.com/download/win #git是什么&#xff1f;有什么用&#xff1f; git相当于一个版本控制系统&#xff0c;版本控制是一种记录一个或若干文件内容变化&#xff0c;以便将来查阅特定版本修订情况的系统。 作用: 记录&#xff08;项目&#…

[Qt5] 使用QtConcurrent::run在异步线程中执行耗时函数

&#x1f4e2;博客主页&#xff1a;https://loewen.blog.csdn.net&#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;本文由 丶布布原创&#xff0c;首发于 CSDN&#xff0c;转载注明出处&#x1f649;&#x1f4e2;现…

Java-树形图工具类TreeUtil

TreeUtil 工具类,包括列表转树形结构、遍历、查找和删除节点等功能。 import java.util.*;public class TreeUtil {/*** 将列表转换为树形结构。** @param target 扁平化的节点列表* @param getId 获取节点ID的函数* @param getParentId 获取节点父ID的函数* @…