flink实现复杂kafka数据读取

接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

常见的文章中,kafka数据结构相对简单,本文根据实际项目数据,说明怎样读取解析复杂kafka数据。并将解析的数据输出到控制台。

1.模拟数据

1.1 模拟数据

{"reportFormat": "2","reportVersion": 1,"reports": [{"filename": "1733277155032RvReport","c": {"objStationInfo": {"sStationName": "LLP入口","ucStationDir": 1,"sStationID": 500001},"objVehicle": {"sUUID": "fdabd178-a169-11eb-9483-b95959072a9d","w64Timestamp": "1733881971628","objRfidInfo": {"sReaderID": "10","objTagData": {"sTID": "1234567891","sEPC": "1234567890"}},"ucReportType": "8","ucVehicleType": "1"}}}]
}

1.2 添加到kafka

使用kafka工具,kafkatool2,具体操作如下:
连接到kafka:
在这里插入图片描述
连接成功:
在这里插入图片描述
添加数据:
在这里插入图片描述
在这里插入图片描述
添加成功:
在这里插入图片描述

2.代码实现

2.1 EnvUtil实现

EnvUtil用于创建flink的运行环境。

package com.zl.utils;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;/*** EnvUtil* @description:*/
public class EnvUtil {/*** 设置flink执行环境* @param parallelism 并行度*/public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为rootSystem.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);if (parallelism >0 ){//设置并行度env.setParallelism(parallelism);} else {env.setParallelism(1);// 默认1}// 添加重启机制env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);//rocksdb状态后端,启用增量checkpointenv.setStateBackend(new EmbeddedRocksDBStateBackend(true));//设置checkpoint路径CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 同一时间只允许一个 checkpoint 进行(默认)checkpointConfig.setMaxConcurrentCheckpoints(1);//最小间隔,10*60*1000=60000checkpointConfig.setMinPauseBetweenCheckpoints(60000);// 取消任务后,checkpoint仍然保存checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint容忍失败的次数checkpointConfig.setTolerableCheckpointFailureNumber(5);//checkpoint超时时间 默认10分钟checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));//禁用operator chain(方便排查反压)env.disableOperatorChaining();return env;}public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//设置时区 东八tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));Configuration configuration = tenv.getConfig().getConfiguration();// 开启miniBatchconfiguration.setString("table.exec.mini-batch.enabled", "true");// 批量输出的间隔时间configuration.setString("table.exec.mini-batch.allow-latency", "5 s");// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条configuration.setString("table.exec.mini-batch.size", "20000");// 开启LocalGlobalconfiguration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");//设置TTL API指定tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));return tenv;}}

2.2 FlinkSourceUtil实现

FlinkSourceUtil用于连接kafka。

package com.zl.kafka.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;/*** @desc:*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {private String uniqueId;//flink生成的唯一键private long reportTime;// 过车时间private String dt;                           // 分区字段private String dh;                           // 小时private String reportFormat;private int reportVersion;private String filename;public String sStationName;    // 采集点名称public String ucStationDir;     // 采集点方向编号public String sStationID;      // 采集点编号private String sUUID;private long w64Timestamp;     //事件时间(毫秒级别)private String sReaderID;//射频设备(模块)代码private String sTIDR;private String sEPCR;private int ucReportType;//8->视频 2->射频 138,202->视频+射频private int ucVehicleType;public void parseTableColunm() {this.reportTime = this.w64Timestamp;this.uniqueId = this.sUUID;}
}

2.3 RvTable实现

RvTable解析数据最后存储的model。

package com.zl.kafka.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;/*** @desc:*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {private String uniqueId;//flink生成的唯一键private long reportTime;// 过车时间private String dt;                           // 分区字段private String dh;                           // 小时private String reportFormat;private int reportVersion;private String filename;public String sStationName;    // 采集点名称public String ucStationDir;     // 采集点方向编号public String sStationID;      // 采集点编号private String sUUID;private long w64Timestamp;     //事件时间(毫秒级别)private String sReaderID;//射频设备(模块)代码private String sTIDR;private String sEPCR;private int ucReportType;//8->视频 2->射频 138,202->视频+射频private int ucVehicleType;public void parseTableColunm() {this.reportTime = this.w64Timestamp;this.uniqueId = this.sUUID;}}

2.4 核心逻辑实现

package com.zl.kafka;import com.alibaba.fastjson.JSON;
import com.zl.kafka.domain.RvTable;
import com.zl.utils.EnvUtil;
import com.zl.utils.FlinkSourceUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;public class KafkaExample {public static void main(String[] args) throws Exception {// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExample");/// 读取kafka数据SingleOutputStreamOperator<String> rvSourceStream = env.addSource(FlinkSourceUtil.getKafkaSource("rvGroup","rv-test","10.86.97.21:9092","earliest"))// earliest/latest.setParallelism(1).uid("getRV").name("getRV");// 解析转换数据格式SingleOutputStreamOperator<String> rvParseStream = null;try {rvParseStream = rvSourceStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) {if (StringUtils.isEmpty(value)) {return;}parseRVData(value, out);}}).setParallelism(1).uid("rvParse").name("rvParse");} catch (Exception e) {e.printStackTrace();}rvParseStream.print();env.execute("rvParseJob");}// mainpublic static void parseRVData(String jsonStr, Collector<String> out) {try {if (StringUtils.isEmpty(jsonStr) || !isJSON(jsonStr)) {return;}JSONObject in = JSONObject.parseObject(jsonStr);// =====报告头信息 =====String reportFormat = stringDefaultIfEmpty(in.getString("reportFormat"));int reportVersion = intDefaultIfEmpty(in.getInteger("reportVersion"));JSONArray reports = in.getJSONArray("reports");if (reports != null) {for (int i = 0; i < reports.size(); i++) {RvTable rvTable = new RvTable();JSONObject record = reports.getJSONObject(i);if (record != null) {String filename = stringDefaultIfEmpty(record.getString("filename"));JSONObject c = record.getJSONObject("c");if (c != null) {// ===== 采集点信息 =====JSONObject objStationInfo = c.getJSONObject("objStationInfo");if(objStationInfo != null) {rvTable.setSStationID(stringDefaultIfEmpty(objStationInfo.getString("sStationID")));rvTable.setSStationName(stringDefaultIfEmpty(objStationInfo.getString("sStationName")));rvTable.setUcStationDir(stringDefaultIfEmpty(objStationInfo.getString("ucStationDir")));}JSONObject objVehicle = c.getJSONObject("objVehicle");if (objVehicle != null) {// ===== 车辆报告信息 =====rvTable.setSUUID(stringDefaultIfEmpty(objVehicle.getString("sUUID")));rvTable.setW64Timestamp(objVehicle.getLong("w64Timestamp"));rvTable.setUcReportType(intDefaultIfEmpty(objVehicle.getInteger("ucReportType")));rvTable.setUcVehicleType(intDefaultIfEmpty(objVehicle.getInteger("ucVehicleType")));// ===== 车辆报告信息/射频车辆信息 =====JSONObject objRfidInfo = objVehicle.getJSONObject("objRfidInfo");if (objRfidInfo != null) {rvTable.setSReaderID(stringDefaultIfEmpty(objRfidInfo.getString("sReaderID")));JSONObject objTagData = objRfidInfo.getJSONObject("objTagData");if (objTagData != null) {rvTable.setSTIDR(stringDefaultIfEmpty(objTagData.getString("sTID")));rvTable.setSEPCR(stringDefaultIfEmpty(objTagData.getString("sEPC")));}}// ===== 自加特殊处理字段 =====long timestamp = rvTable.getW64Timestamp();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");Date date = new Date(timestamp);String[] s = simpleDateFormat.format(date).split(" ");rvTable.setDt(s[0]);rvTable.setDh(s[1]);out.collect(JSONObject.toJSONString(rvTable));}// if (objVehicle != null)}// if (c != null)}// if (record != null)}// for 循环}} catch (Exception e) {e.printStackTrace();// 此处把解析后的数据存储到数据库……}}// parseRVDatapublic static boolean isJSON(String str) {boolean result;try {JSON.parse(str);result = true;} catch (Exception e) {result = false;}return result;}public static int intDefaultIfEmpty(Integer num) {if (num == null) {num = 0;return num;}return num;}public static String stringDefaultIfEmpty(String str) {return StringUtils.defaultIfEmpty(str, "ENULL");}public static Long longDefaultIfEmpty(Long num) {if (num == null) {num = 0l;return num;}return num;}public static Double doubleDefaultIfEmpty(Double num) {if (num == null) {num = 0.0;return num;}return num;}
}

2.5 pom.xml

注意修改此处:
在这里插入图片描述

3.运行效果

3.1 运行日志

在这里插入图片描述

3.2 web UI

访问:http://IP:1000/
在这里插入图片描述
在这里插入图片描述

4.部署

相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcKafka"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.kafka.KafkaExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5. 完整代码

完整代码见:https://gitee.com/core815/flink-cdc-mysql

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/64808.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

越疆科技营收增速放缓:毛利率未恢复,持续亏损下销售费用偏高

《港湾商业观察》施子夫 12月13日&#xff0c;深圳市越疆科技股份有限公司&#xff08;以下简称&#xff0c;越疆科技&#xff0c;02432.HK&#xff09;发布全球发售公告&#xff0c;公司计划全球发售4000万股股份&#xff0c;其中3800万股国际发售&#xff0c;200万股香港公开…

datasets 笔记:加载数据集(基本操作)

参考了huggingface的教程 1 了解数据集基本信息&#xff08; load_dataset_builder&#xff09; 在下载数据集之前&#xff0c;通常先快速了解数据集的基本信息会很有帮助。数据集的信息存储在 DatasetInfo 中&#xff0c;可能包括数据集描述、特征和数据集大小等信息。&…

uniapp video组件无法播放视频解决方案

前言 一般正常的视频使用video组件就能播放。但视频源存在问题&#xff0c;在浏览器能正常播放 在Hbuilderx内置浏览器 在真机无法播放 使用v-html的方式 <template> <uni-popup class"videoPop" type"center" ref"videoPop">&…

Springboot 学习 之 logback-spring.xml 日志压缩 .tmp 临时文件问题

文章目录 前言功能简述1. 自定义日志文件名2. 归档规则 && 压缩2.1. 归档配置2.2. 归档压缩2.3. 日志格式 && 编码 现象原因解决办法 前言 在 Springboot 应用中&#xff0c;默认使用 logback-spring.xml 配置日志相关 功能简述 1. 自定义日志文件名 <fi…

Java程序设计2(六)

第五章&#xff1a;IO流 &#xff08;java.io包中&#xff09; 一、理解 1. 简单而言&#xff1a;流就是内存与存储设备之间传输数据的通道、管道。 2. 分类&#xff1a; (1) 按方向(以JVM虚拟机为参照物)【重点】 输入流&#xff1a;将中的内容读入到中。 输出流&#xff1a…

Java图片拼接

最近遇到一个挺离谱的功能&#xff0c;某个表单只让上传一张图&#xff0c;多图上传会使导出失败。跟开发沟通后表示&#xff0c;这个问题处理不了。我... 遂自己思考&#xff0c;能否以曲线救国的方式拯救一下&#xff0c;即不伤及代码之根本&#xff0c;又能解决燃眉之急。灵…

工程经济学(尊享版)

工程经济学是一门应用性的经济学科 也是一门介于自然科学与社会科学的之间的边缘学科。它是根据现代科学技术和社会经济发展的需要&#xff0c;在自然科学和社会科学的发展过程中相互渗透、相互促进&#xff0c;逐渐形成和发展起来的&#xff0c;是工程技术学科和经济学科交叉的…

爬虫基础学习

爬虫概念与工作原理 爬虫是什么&#xff1a;爬虫&#xff08;Web Scraping&#xff09;是自动化地访问网站并提取数据的技术。它模拟用户浏览器的行为&#xff0c;通过HTTP请求访问网页&#xff0c;解析HTML文档并提取有用信息。 爬虫的基本工作流程&#xff1a; 发送HTTP请求…

.NET重点

B/S C/S什么语言 B/S&#xff1a; 浏览器端&#xff1a;JavaScript&#xff0c;HTML&#xff0c;CSS 服务器端&#xff1a;ASP&#xff08;.NET&#xff09;PHP/JSP 优势&#xff1a;维护方便&#xff0c;易于升级和扩展 劣势&#xff1a;服务器负担沉重 C/S java/.NET/…

STM32HAL I2C函数

8.5 使用IIC协议读写EEPROM 硬件方式实现 &#xff08;HAL库&#xff09; **HAL_I2C_Mem_Write() :这种方法可以写1个或者多个字节 ** /*** brief 以阻塞模式向指定的内存地址写入数据* param hi2c 指向 I2C_HandleTypeDef 结构体的指针&#xff0c;包含指定 I2C 的配置信息…

智能工厂的设计软件 三种处理单元(NPU/GPU/CPU)及其在深度学习框架中的作用 之5(腾讯云AI代码助手 之3)

前情提要 前面讨论了智能工厂的设计软件 中三种处理单元&#xff08;NPU/GPU/CPU&#xff09;及其在深度学习框架中的作用是协作完成一个深度学习任务。 最后通过明确深度学习本身的目的是建构一个公理化系统--作为 自然语言形式化 建模约束&#xff08;为人类编辑 &#xff0…

Linux——卷

Linux——卷 介绍 最近做的项目&#xff0c;涉及到对系统的一些维护&#xff0c;有些盘没有使用&#xff0c;需要创建逻辑盘并挂载到指定目录下。有些软件需要依赖空的逻辑盘&#xff08;LVM&#xff09;。 先简单介绍一下卷的一些概念&#xff0c;有分区、物理存储介质、物…

M3D: 基于多模态大模型的新型3D医学影像分析框架,将3D医学图像分析从“看图片“提升到“理解空间“的层次,支持检索、报告生成、问答、定位和分割等8类任务

M3D: 基于多模态大模型的新型3D医学影像分析框架&#xff0c;将3D医学图像分析从“看图片“提升到“理解空间“的层次&#xff0c;支持检索、报告生成、问答、定位和分割等8类任务 论文大纲理解1. 确认目标2. 分析过程&#xff08;目标-手段分析&#xff09;核心问题拆解 3. 实…

clickhouse-副本和分片

1、副本 1.1、概述 集群是副本和分片的基础&#xff0c;它将ClickHouse的服务拓扑由单节点延伸到多个节点&#xff0c;但它并不像Hadoop生态的某些系统那样&#xff0c;要求所有节点组成一个单一的大集群。ClickHouse的集群配置非常灵活&#xff0c;用户既可以将所有节点组成…

Redis 集群实操:强大的数据“分身术”

目录 Redis Cluster集群模式 1、介绍 2、架构设计 3、集群模式实操 4、故障转移 5、常用命令 Redis Cluster集群模式 1、介绍 redis3.0版本推出的Redis Cluster 集群模式&#xff0c;每个节点都可以保存数据和整个集群状态&#xff0c;每个节点都和其他所有节点连接。Cl…

C# 从控制台应用程序入门

总目录 前言 从创建并运行第一个控制台应用程序&#xff0c;快速入门C#。 一、新建一个控制台应用程序 控制台应用程序是C# 入门时&#xff0c;学习基础语法的最佳应用程序。 打开VS2022&#xff0c;选择【创建新项目】 搜索【控制台】&#xff0c;选择控制台应用(.NET Framew…

WPF制作图片闪烁的自定义控件

1.定义自定义控件 BlinkingImage.cs: using System; using System.Windows; using System.Windows.Controls; using System.Windows.Media.Animation; using System.Windows.Media.Imaging;namespace YourNamespace {public class BlinkingImage : Control{public static rea…

猫咪睡眠:萌态背后的奥秘与启示

猫咪的睡眠&#xff0c;犹如一本充满趣味与奥秘的小书&#xff0c;每一页都写满了它们独特的习性与本能。 猫咪堪称 “睡眠大师”&#xff0c;睡眠时间之长令人咋舌&#xff0c;一天中大约有 12 - 16 个小时在梦乡中度过&#xff0c;幼猫和老年猫甚至能睡更久。它们似乎深谙放…

QML Text组件

文章目录 前言主体基本文本显示字体和样式富文本支持长文本的处理文本样式与效果超链接Label 元素总结 前言 在 QML 中&#xff0c;Text 和 Label 是常用的文本显示元素&#xff0c;它们在显示文本方面具有相似性&#xff0c;但在功能和定制性上也存在一些差异。Text 元素用于…

基于前端技术UniApp和后端技术Node.js的电影购票系统

文章目录 摘要Abstruct第一章 绪论1.1 研究背景与意义1.2 国内外研究现状 第二章 需求分析2.1 功能需求分析2.2 非功能性需求分析 第二章系统设计3.1 系统架构设计3.1.1 总体架构3.1.2 技术选型 3.2 功能架构 第四章 系统实现4.1 用户端系统实现4.1.1 用户认证模块实现4.1.2 电…