利用Spark将Kafka数据流写入HDFS

利用Spark将Kafka数据流写入HDFS

在当今的大数据时代,实时数据处理和分析变得越来越重要。Apache Kafka作为一个分布式流处理平台,已经成为处理实时数据的事实标准。而Apache Spark则是一个强大的大数据处理框架,它提供了对数据进行复杂处理的能力。
本篇博客将介绍如何使用Spark来读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。
环境准备
在开始之前,确保你的开发环境中已经安装了以下软件:

Apache Kafka

#启动zookeeper
zkServer start
#启动kafka服务
kafka-server-start /opt/homebrew/etc/kafka/server.properties

Apache Spark

<properties><scala.version>2.12.17</scala.version><spark.version>3.0.0</spark.version><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka Streaming dependency --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency>

Hadoop HDFS

#启动hdfs
start-dfs.sh

Java开发环境
此外,你需要在项目中包含Spark和Kafka的依赖库。

代码实现
首先,我们定义一个Scala case class Job 来表示从Kafka读取的每条记录的数据结构。

case class Job(Position: String,Company: String,Salary: String,Location: String,Experience: String,Education: String,Detail: String
)

接下来,我们编写一个Kafka2Hdfs对象,并在其中实现main方法。这个方法将创建一个SparkSession,配置Kafka读取选项,并从Kafka中读取数据流。

object Kafka2Hdfs {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Kafka2Hdfs").master("local[*]").getOrCreate()import spark.implicits._val kafkaOptions = Map[String, String]("kafka.bootstrap.servers" -> "127.0.0.1:9092","subscribe" -> "flume","startingOffsets" -> "earliest")val stream = spark.readStream.format("kafka").options(kafkaOptions).load()

我们使用subscribe选项指定Kafka中的topic名称,这里我们使用的是flume。startingOffsets选项设置为earliest,意味着我们从最早的记录开始读取数据。

接下来,我们将Kafka中的数据转换成DataFrame。我们首先将每条记录的value字段转换为字符串,然后使用map函数将每条记录解析为Job对象。

val jobDs = stream.selectExpr("CAST(value AS STRING)").as[String].map(line => {val fields = line.split(",")Job(Position = fields(0),Company = fields(1).trim,Salary = fields(2).trim,Location = fields(3).trim,Experience = fields(4).trim,Education = fields(5).trim,Detail = fields(6).trim)}).toDF()

现在,我们已经有了一个包含Job对象的DataFrame。接下来,我们将这个DataFrame以CSV格式写入到HDFS中。我们使用writeStream方法,并设置format为csv,同时指定输出路径和检查点位置。

val query: StreamingQuery = jobDs.writeStream.format("csv").option("header", "false").option("path", "/").option("checkpointLocation", "/ck").start()

注意,我们在这里将header选项设置为false,因为我们不打算在CSV文件中包含列名。path选项指定了输出文件的存储路径,而checkpointLocation选项指定了检查点的存储路径,这对于流处理的可靠性非常重要。

最后,我们调用awaitTermination方法来等待流处理的结束。在实际的生产环境中,你可能希望将这个流处理任务部署到一个集群上,并让它持续运行。

query.awaitTermination()

总结
在这篇博客中,我们介绍了如何使用Spark读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。这种方法可以用于各种实时数据处理场景,例如日志分析、事件监控等。通过这种方式,我们可以将实时数据转换为静态数据,以便进行更深入的分析和处理。

完整代码:

package com.lhy.sparkkafka2hdfsimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Row, SparkSession}case class Job(Position:String,Company:String,Salary:String,Location:String,Experience:String,Education:String,Detail:String)
object Kafka2Hdfs{def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Kafka2Hdfs").master("local[*]").getOrCreate()import spark.implicits._val kafkaOptions = Map[String, String]("kafka.bootstrap.servers" -> "127.0.0.1:9092","subscribe" -> "flume","startingOffsets" -> "earliest")val stream = spark.readStream.format("kafka").options(kafkaOptions).load()val jobDs = stream.selectExpr("CAST(value AS STRING)").as[String].map(line => {val fields = line.split(",")Job(Position = fields(0),Company = fields(1).trim,Salary = fields(2).trim,Location = fields(3).trim,Experience = fields(4).trim,Education = fields(5).trim,Detail = fields(6).trim)}).toDF()
//    val query = jobDs.writeStream.format("console").start()val query: StreamingQuery = jobDs.writeStream.format("csv").option("header", "false").option("path", "/").option("checkpointLocation", "/ck").start()query.awaitTermination()}

在这里插入图片描述
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/792449.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Echarts 自适应宽高,或指定宽高进行自适应

文章目录 需求分析 需求 有一个按钮实现对Echarts的指定缩放与拉长&#xff0c;形成自适应效果 拉长后效果图 该块元素缩短后效果图 分析 因为我习惯使用 ref 来获取组件的 DOM 元素&#xff0c;然后进行挂载 <div ref"echartsRef" id"myDiv" :sty…

OCR常用识别算法综述

参考&#xff1a;https://aistudio.baidu.com/education/lessonvideo/3279888 语种&#xff1a;常用字符36与常用汉字6623&#xff0c;区别。 标注&#xff1a;文本型位置/单字符位置&#xff0c;后者标注成本大 挑战&#xff1a;场景文字识别&#xff1a;字符大小、颜色、字体…

力扣1379---找出克隆二叉树的相同节点(Java、DFS、简单题)

目录 题目描述&#xff1a; 思路描述&#xff1a; 代码&#xff1a; &#xff08;1&#xff09;&#xff1a; &#xff08;2&#xff09;&#xff1a; 题目描述&#xff1a; 给你两棵二叉树&#xff0c;原始树 original 和克隆树 cloned&#xff0c;以及一个位于原始树 ori…

蓝牙学习九(定向广播 ADV_DIRECT_IND)

一、简介 广播类型有如下&#xff1a; 非定向可连接广播&#xff08;ADV_IND&#xff09;。可连接的非定向广播&#xff0c;表示当前设备可以接受任何设备的连接请求。 定向可连接广播&#xff08;ADV_DIRECT_IND&#xff09;。可连接的定向广播&#xff0c;设备不能被主动扫描…

Kotlin:常用标准库函数(let、run、with、apply、also)

一、let 扩展函数 Kotlin标准库函数let可用于范围确定和空检查。当调用对象时&#xff0c;let执行给定的代码块并返回其最后一个表达式的结果。对象可以通过引用(默认情况下)或自定义名称在块中访问。 let扩展函数源码 let.kt文件代码 fun main() {println("isEmpty $is…

Stable Diffusion扩散模型【详解】小白也能看懂!!

文章目录 1、Diffusion的整体过程2、加噪过程2.1 加噪的具体细节2.2 加噪过程的公式推导 3、去噪过程3.1 图像概率分布 4、损失函数5、 伪代码过程 此文涉及公式推导&#xff0c;需要参考这篇文章&#xff1a; Stable Diffusion扩散模型推导公式的基础知识 1、Diffusion的整体…

【Unity 实用工具篇】| Unity中 实现背景模糊效果,简单易用

前言【Unity 实用工具篇】| Unity 实现背景模糊效果,简单易用一、实现背景模糊效果1.1 介绍1.2 效果展示1.3 使用说明及下载二、插件资源简单介绍2.1 导入下载好的资源2.2 功能介绍2.2.1 捕获特效2.2.2 高级选项

【Vue】watch监听复杂数据,新值与旧值一样

问题 watch监听复杂数据&#xff0c;例如数组&#xff0c;旧值与新值一样 解决方案 监听回调里返回新数组&#xff0c;新、旧数组地址改变&#xff0c;得到的值也就不一样&#xff0c;例↓ ()>[...data] 码 test.js // 数据 const musicList ref([{ id: 540000200805…

00-JAVA基础-动态编译

动态编译 JAVA 6 引入了动态编译机制。Java 动态编译是指在运行时将Java源代码编译成可执行的字节码。这通常使用Java的内置编译器API javax.tools.JavaCompiler 来实现。 动态编译的应用场景 可以做一个浏览器编写java代码&#xff0c;上传服务器编译和运行的在线测评系统服…

继承.Java

目录 1&#xff0c;概述 1.1继承的含义 1.2什么时候用继承 1.3继承的好处 1.4继承的特点 2&#xff0c;继承的格式 3&#xff0c;可以继承哪些内容 4&#xff0c;成员方法和成员变量的访问特点 5&#xff0c;构造方法的访问特点 6&#xff0c;this&#xff0c;super…

前端三剑客 —— CSS (第一节)

目录 CSS 什么是CSS CSS的几种写法&#xff1a; 行内样式 内嵌样式 外链样式 import 加载顺序 CSS选择器*** 基本选择器 ID选择器 标签选择器 类选择器 通用选择器 包含选择器 上节内容中提到了 前端三剑客 —— HTML 超文本标记语言&#xff0c;这节内容 跟大家…

大厂面试之【Redis持久化机制】 - RDB和AOF概述及应用配置

文章目录 Redis持久化1. RDB(Redis DataBase)1.1 概述1.2 配置应用 2. AOF(Append Only File)2.1 概述2.2 配置应用 Redis持久化 先上结论&#xff1a;Redis持久化操作分为rdb以及aof&#xff0c;但是前者已经够用 1. RDB(Redis DataBase) 1.1 概述 rdb保存的是dump.rdb文件在指…

WEB漏洞挖掘详细教程--用户输入合规性(sql注入测试)

前置教程&#xff1a;WEB漏洞挖掘&#xff08;SRC&#xff09;详细教程--信息收集篇-CSDN博客 WEB漏洞挖掘&#xff08;SRC&#xff09;详细教程--身份认证与业务一致性-CSDN博客 WEB漏洞挖掘&#xff08;SRC&#xff09;详细教程--业务数据篡改-CSDN博客 2.4 用户输入合规性…

MCRNet:用于乳腺超声成像语义分割的多级上下文细化网络

这里写目录标题 摘要引言方法 摘要 由于对比度差、目标边界模糊和大量阴影的不利影响&#xff0c;乳腺超声成像中的自动语义分割仍然是一项具有挑战性的任务。最近&#xff0c;具有U形的卷积神经网络&#xff08;CNN&#xff09;在医学图像分割中表现出相当好的性能。然而&…

【C++ STL迭代器】iterator

文章目录 【 1. 迭代器的属性 】【 2. 不同容器支持的迭代器 】【 3. 迭代器的定义方式 】【 4. 实例 】4.1 定义方式&#xff1a;正向迭代器和反向迭代器4.2 迭代器属性&#xff1a;前向迭代、双向迭代、随机迭代4.2 迭代器的遍历方法4.3 auto关键字 自动指定迭代器定义类型 背…

【Qt】:常用控件(四:显示类控件)

常用控件 一.Lable二.LCD Number 一.Lable QLabel 可以⽤来显⽰⽂本和图⽚. 代码⽰例:显⽰不同格式的⽂本 代码⽰例:显⽰图⽚ 此时,如果拖动窗⼝⼤⼩,可以看到图⽚并不会随着窗⼝⼤⼩的改变⽽同步变化 为了解决这个问题,可以在Widget中重写resizeEvent函数。当用户把窗口从A拖…

基于单片机双路压力监测报警系统

**单片机设计介绍&#xff0c;基于单片机双路压力监测报警系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机双路压力监测报警系统是一个专门设计用于实时监测和报警两路压力变化的系统。它结合了单片机控制技术和压…

图DP

目录 有向无环图DP 力扣 329. 矩阵中的最长递增路径 力扣 2192. 有向无环图中一个节点的所有祖先 有向有环图DP 力扣 1306. 跳跃游戏 III 有向无环图DP 力扣 329. 矩阵中的最长递增路径 给定一个 m x n 整数矩阵 matrix &#xff0c;找出其中 最长递增路径 的长度。 对…

【DA-CLIP】test.py解读,调用DA-CLIP和IRSDE模型复原计算复原图与GT图SSIM、PSNR、LPIPS

文件路径daclip-uir-main/universal-image-restoration/config/daclip-sde/test.py 代码有部分修改 导包 import argparse import logging import os.path import sys import time from collections import OrderedDict import torchvision.utils as tvutilsimport numpy as…

数学知识--(质数,约数)

本文用于个人算法竞赛学习&#xff0c;仅供参考 目录 一.质数的判定 二.分解质因数 三.质数筛 1.朴素筛法 2.埃氏筛法 3.线性筛法 四.约数 1.求一个数的所有约数 2.约数个数和约数之和 3.欧几里得算法&#xff08;辗转相除法&#xff09;-- 求最大公约数 一.质数的判定 …