实战Flink Java api消费kafka实时数据落盘HDFS

文章目录

  • 1 需求分析
  • 2 实验过程
    • 2.1 启动服务程序
    • 2.2 启动kafka生产
  • 3 Java API 开发
    • 3.1 依赖
    • 3.2 代码部分
  • 4 实验验证
    • STEP1
    • STEP2
    • STEP3
  • 5 时间窗口

1 需求分析

在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。

flink版本1.13

kafka版本0.8

hadoop版本3.1.4

2 实验过程

2.1 启动服务程序

为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件:

[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager

确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。

确保 Kafka Server 在运行,因为 Flink 的 Kafka Consumer 需要连接到 Kafka Broker。

启动 Flink 的 JobManager 和 TaskManager,这是执行 Flink 任务的核心组件。

确保这些组件都在运行,以便 Flink 作业能够正常消费 Kafka 中的数据并将其写入 HDFS。

  • 具体的启动命令在此不再赘述。

2.2 启动kafka生产

  • 当前kafka没有在守护进程后台运行;
  • 创建主题,启动该主题的生产者,在kafka的bin目录下执行;
  • 此时可以生产数据,从该窗口键入任意数据进行发送。
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1

在这里插入图片描述

3 Java API 开发

3.1 依赖

此为项目的所有依赖,包括flink、spark、hbase、ck等,实际本需求无需全部依赖,均可在阿里云或者maven开源镜像站下载。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-test</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.6</flink.version><hbase.version>2.4.0</hbase.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.6</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version><exclusions><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>log4j</artifactId><groupId>log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase.version}</version><exclusions><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.4.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-2.2_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@hadoop10:/opt/app</url></configuration></plugin></plugins></build></project>
  • 依赖参考
    在这里插入图片描述

3.2 代码部分

  • 请注意kafka和hdfs的部分需要配置服务器地址,域名映射。
  • 此代码的功能是消费topic1主题,将数据直接写入hdfs中。
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Test9_kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop10:9092");properties.setProperty("group.id", "test");// 使用FlinkKafkaConsumer作为数据源DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));String outputPath = "hdfs://hadoop10:8020/out240102";// 使用StreamingFileSink将数据写入HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")).build();// 添加Sink,将Kafka数据直接写入HDFSds1.addSink(sink);ds1.print();env.execute("Flink Kafka HDFS");}
}

4 实验验证

STEP1

运行idea代码,程序开始执行,控制台除了日志外为空。下图是已经接收到生产者的数据后,消费在控制台的截图。

在这里插入图片描述

STEP2

启动生产者,将数据写入,数据无格式限制,随意填写。此时发送的数据,是可以在STEP1中的控制台中看到屏幕打印结果的。
在这里插入图片描述

STEP3

在HDFS中查看对应的目录,可以看到数据已经写入完成。
我这里生成了多个inprogress文件,是因为我测试了多次,断码运行了多次。ide打印在屏幕后,到hdfs落盘写入,中间有一定时间,需要等待,在HDFS中刷新数据,可以看到文件大小从0到被写入数据的过程。
在这里插入图片描述

5 时间窗口

  • 使用另一种思路实现,以时间窗口的形式,将数据实时写入HDFS,实验方法同上。截图为发送数据消费,并且在HDFS中查看到数据。
    在这里插入图片描述

在这里插入图片描述

package day2;import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Test9_kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop10:9092");properties.setProperty("group.id", "test");// 使用FlinkKafkaConsumer作为数据源DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));String outputPath = "hdfs://hadoop10:8020/out240102";// 使用StreamingFileSink将数据写入HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")).build();// 在一个时间窗口内将数据写入HDFSds1.process(new CustomProcessFunction())  // 使用自定义 ProcessFunction.addSink(sink);// 执行程序env.execute("Flink Kafka HDFS");}
}
package day2;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class CustomProcessFunction extends ProcessFunction<String, String> {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {// 在这里可以添加具体的逻辑,例如将数据写入HDFSSystem.out.println(value);  // 打印结果到屏幕out.collect(value);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/605141.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java火车查询管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java Web火车查询管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql…

深度学习工具-Jupyter Notebook使用

在本地编辑和运行代码 运行命令jupyter notebook。如果浏览器未自动打开&#xff0c;请打开http://localhost:8888 你可以通过单击网页上显示的文件夹来访问notebook文件。它们通常有后缀“.ipynb”。为了简洁起见&#xff0c;我们创建了一个临时的“test.ipynb”文件。单击后…

用python写个根据水库大坝安全监测excel数据自动生成word水库大坝安全监测报告

要实现这个功能&#xff0c;你需要使用Python中的一些库&#xff0c;如pandas用于处理Excel数据&#xff0c;python-docx用于生成Word文档。 以下是一个简单的示例&#xff0c;展示如何从Excel数据中读取数据&#xff0c;并使用python-docx生成Word报告。 首先&#xff0c;确保…

MySQL第五战:常见面试题(下)

前言&#xff1a; 在当今的IT世界&#xff0c;数据库是任何应用程序的核心。而MySQL&#xff0c;作为最流行的开源关系数据库管理系统&#xff0c;已经成为许多开发者和企业的首选。无论是初创公司还是大型企业&#xff0c;都依赖于MySQL来存储、管理和检索数据。 随着技术的…

Kubernetes 学习总结(44)—— Kubernetes 1.29 中的删除、弃用和主要更改

Kubernetes API 删除和弃用流程 Kubernetes 项目对功能有详细记录的弃用政策。此策略规定&#xff0c;只有当同一 API 的更新、稳定版本可用时&#xff0c;才可以弃用稳定的 API&#xff0c;并且每个稳定性级别的 API 都有最短生命周期。已弃用的 API 是已标记为在未来 Kuber…

计算机网络(超级详细笔记)

使用教材计算机网络&#xff08;第8版&#xff09;&#xff08;谢希仁&#xff09; 第一章&#xff1a;概述 第二章&#xff1a;物理层 第三章&#xff1a;数据链路层 第四章&#xff1a;网络层 第五章&#xff1a;运输层 第六章&#xff1a;应用层 目…

Linux学习第50天:Linux块设备驱动实验(二):Linux三大驱动之一

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 三、使用请求队列实验 1.实验程序编写 使用开发板上的一段RAM来模拟一段块设备&#xff0c;也就是ramdisk. 机械硬盘 34 #define RAMDISK_SIZE (2 * 1024 * 10…

使用即时设计绘制原型设计方便吗?和Axure RP相比怎么样?

对于原型设计&#xff0c;APP 和 Web 都是一样的&#xff0c;因为产品原型是用来确定需求的工具。我们使用这种工具的目的是为了快速迭代&#xff0c;从而深入挖掘和筛选产品的需求。 绘制原型&#xff0c;最重要的原则是&#xff1a;快速、清晰&#xff01; Axure 工具的优缺…

智能合约安全之Solidity重入攻击漏洞的深入理解

漏洞原理 以太坊智能合约的特点之一是能够调用和使用其他外部合约的代码。这些合约通常会操作以太币,经常将以太发送到各种外部用户地址。这种调用外部合约或向外部地址发送以太币的操作,需要合约提交外部调用。这些外部调用可能被攻击者劫持,比如,通过一个回退函数,强迫…

2023APMCM亚太数学建模C题 - 中国新能源汽车的发展趋势(3)

六、问题三的模型建立和求解 6.1问题分析 问题3.收集数据&#xff0c;建立数学模型分析新能源电动汽车对全球传统能源汽车行业的影响。 本题要求建立模型分析新能源电动汽车对全球传统能源汽车行业的影响。由于数据集可能略大&#xff0c;而在处理复杂问题、大量特征和大规模…

Python YAML数据驱动:实现自动化测试的利器

引言&#xff1a; 在软件开发过程中&#xff0c;自动化测试是保证软件质量的重要环节。而数据驱动测试作为一种常见的测试方法&#xff0c;通过使用不同的测试数据来验证软件的功能和性能。本文将介绍如何使用Python中的YAML库来实现数据驱动测试&#xff0c;以及如何利用YAML数…

spatialRF

官网&#xff1a;Easy Spatial Modeling with Random Forest • spatialRF (blasbenito.github.io) spatialRF是一种在考虑空间自相关的前提下&#xff0c;利用随机森林对空间数据进行回归并解释的R包。 数据要求 参数命名 data&#xff1a;训练集&#xff0c;data frame。 …

基于SpringBoot的房屋租赁管理系统

文章目录 项目介绍主要功能截图&#xff1a;部分代码展示设计总结项目获取方式 &#x1f345; 作者主页&#xff1a;超级无敌暴龙战士塔塔开 &#x1f345; 简介&#xff1a;Java领域优质创作者&#x1f3c6;、 简历模板、学习资料、面试题库【关注我&#xff0c;都给你】 &…

WorkPlus Meet打造高质量的视频会议体验,助力实时远程协作

在全球化的商业环境中&#xff0c;远程协作和在线会议成为了企业高效工作的关键。作为一款高质量的视频会议软件&#xff0c;WorkPlus Meet以其卓越的性能和创新的功能&#xff0c;成为企业实时远程协作的首选。 WorkPlus Meet打造了高质量的视频会议体验&#xff0c;为企业提供…

书生·浦语大模型趣味 Demo笔记及作业

文章目录 笔记作业基础作业&#xff1a;进阶作业&#xff1a; 笔记 书生浦语大模型InternLM-Chat-7B 智能对话 Demo&#xff1a;https://blog.csdn.net/m0_49289284/article/details/135412067书生浦语大模型Lagent 智能体工具调用 Demo&#xff1a;https://blog.csdn.net/m0_…

雾天条件下 SLS 融合网络的三维目标检测

论文地址&#xff1a;3D Object Detection with SLS-Fusion Network in Foggy Weather Conditions 论文代码&#xff1a;https://github.com/maiminh1996/SLS-Fusion 论文摘要 摄像头或激光雷达&#xff08;光检测和测距&#xff09;等传感器的作用对于自动驾驶汽车的环境意识…

SSH关闭Liunx系统正常运行

一、需求 在Linux系统中&#xff0c;在执行一些运行时间比较长的任务时&#xff0c;必须等待执行完毕才能断开SSH连接或关闭客户端软件&#xff0c;否则可能会导致执行中断。本文介绍两种保障程序在您退出登录后持续运行的方法。 二、使用screen执行命令 GNU Screen可以当做…

vue项目中的录屏插件recordrtc且带声音

vue项目中的录屏插件recordrtc且带声音 一、效果图二、安装插件三、直接上代码 一、效果图 其中窗口录屏不带声音&#xff0c;chrome标签和整个屏幕的录屏是带声音的 二、安装插件 npm i recordrtc 三、直接上代码 <template><div class"record-page">…

react-router-domV6.21.1版本结合ant design mobile的TabBar标签栏和Popup弹出层实现移动端路由配置

react-router-demo react-router-dom在V6版本之后更换了很多的API名称&#xff0c;在ant design mobile的TabBar配置中还是之前的旧版本&#xff0c;比如使用了switch组件等。我们在这里使用新版本的react-router-dom进行react移动端的配置 首先使用npm下载最新版的react-rout…

MYSQL双主节点–更换ip

MYSQL双主节点–更换ip 一、更换双主节点ip 1.停止mysql服务 #安装了supervisor supervisorctl stop mysql #未安装 systemctl stop mysqld2.修改网卡配置信息 注&#xff1a;ens33是网卡名称&#xff0c;可能网卡不叫ens33 vi /etc/sysconfig/network-scripts/ifcfg-ens333…