使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能,以及Kafka和HDFS作为我们的数据传输和存储工具。
1、环境设置:
首先,确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>spark_project</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12.12</scala.version><spark.version>3.2.0</spark.version><kafka.version>2.8.1</kafka.version></properties><dependencies><!-- Spark dependencies --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka dependencies --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><!-- Scala library --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency>   </dependencies>
</project>

mysql中表结构:
在这里插入图片描述

2、从MySQL读取数据到Kafka:
我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据,并将其转换为JSON格式,然后将数据写入到Kafka主题中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfsimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject Mysql2Kafka {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName("MySQLToKafka").master("local[*]").getOrCreate()// 设置 MySQL 连接属性val mysqlProps = new Properties()mysqlProps.setProperty("user", "root")mysqlProps.setProperty("password", "12345678")mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver")// 从 MySQL 数据库中读取数据val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/mydb", "comment", mysqlProps)// 将 DataFrame 转换为 JSON 字符串val jsonDF = jdbcDF.selectExpr("to_json(struct(*)) AS value")// 将数据写入 KafkajsonDF.show()jsonDF.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "comment").save()// 停止 SparkSessionspark.stop()}}

以上代码首先创建了一个SparkSession,然后设置了连接MySQL所需的属性。接着,它使用jdbc.read从MySQL数据库中读取数据,并将数据转换为JSON格式,最后将数据写入到名为"comment"的Kafka主题中。提示:topic主题会被自动创建。

从Kafka消费数据并写入HDFS:
接下来,我们将设置Spark Streaming来消费Kafka中的数据,并将数据保存到HDFS中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfsimport com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}case class Comment(author_name:String,fans:String,comment_text:String,comment_time:String,location:String,user_gender:String)object kafka2Hdfs {def main(args: Array[String]): Unit = {// 设置 SparkConfval sparkConf = new SparkConf().setAppName("KafkaToHDFS").setMaster("local[*]")// 创建 StreamingContext,每秒处理一次val ssc = new StreamingContext(sparkConf, Seconds(1))// 设置 Kafka 相关参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092", // Kafka broker 地址"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "spark-consumer-group", // Spark 消费者组"auto.offset.reset" -> "earliest", // 从最新的偏移量开始消费"enable.auto.commit" -> (false: java.lang.Boolean) // 不自动提交偏移量)// 设置要订阅的 Kafka 主题val topics = Array("comment")// 创建 Kafka Direct Streamval stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 从 Kafka 中读取消息,然后将其写入 HDFSstream.map({rdd=>val comment = JSON.parseObject(rdd.toString(), classOf[Comment])comment.author_name+","+comment.comment_text+","+comment.comment_time+","+comment.fans+","+comment.location+","+comment.user_gender}).foreachRDD { rdd =>if (!rdd.isEmpty()) {println(rdd)rdd.saveAsTextFile("hdfs://hadoop101:8020/tmp/")}}// 启动 Spark Streamingssc.start()ssc.awaitTermination()}}

以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象,并将其保存为逗号分隔的文本文件,最终存储在HDFS的/tmp目录中。
在这里插入图片描述

结论:
通过本文的介绍和示例代码,您现在应该了解如何使用Apache Spark构建一个实时数据流水线,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。

**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/11244.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL中表的插入,修改,删除语法

插入 插入数据有四种方法,代码如下 -- 给表中添加元素 -- insert into 表名 (列名) values(数据)INSERT INTO student(NAME,gender,birthday,phone)VALUES(张三,男,2003-2-10,13333333333) INSERT INTO student(NAME,gender,birthday,phone,address,height,rig_time) VALUES(…

在Mac环境下打包Python应用

1、创建虚拟环境 conda create -n ur_name python3.7 pip install -r requirement.txt 2、配置文件生成 cd ur_project py2applet --make-setup __main__.py 3、APP打包 # 编辑setup.py文件,具体参考本文文献 python setup.py py2app 用 py2app 将 Python 脚本打包成 Mac…

高级优化理论与方法(十二)

高级优化理论与方法&#xff08;十二&#xff09; LPDuality of LPWeek LP Duality TheoremStrong LP Duality TheoremCorollary Complementary Slackness ConditionRemarksExample Non-Simplex MethodsKhachiyan (Ellipsoid)Karmarkar (Interior point) Integer Linear Progra…

Leetcode-有效的括号

20. 有效的括号 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/valid-parentheses/ 题目 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&…

MongoDB 数据探索之道:查询文档操作详解

在 MongoDB 数据库中&#xff0c;查询文档是一种强大的工具&#xff0c;能够帮助您深入了解数据并提取所需信息。无论是从大型数据集中检索特定记录&#xff0c;还是进行数据统计和分析&#xff0c;查询文档都是不可或缺的。在本指南中&#xff0c;我们将深入探讨 MongoDB 查询…

c++ map,set封装

map 是一个 kv 结构&#xff0c; set 是 k结构。 我们前面模拟实现了 红黑树&#xff0c;但是我们实现的红黑树把 kv 结构写死了&#xff0c;怎么样才能用泛型编程的思想来实现map和set呢 我们先简单看一下原码中是怎么实现的 1.原码实现逻辑 我们打开这里的 stl_set.h 通过…

Dubbo基本使用

Dubbo基本使用 1.项目介绍2.开发步骤2.1 启动注册中心2.2 初始化项目2.3 添加 Maven 依赖2.3.1 父pom.xml2.3.1 consumer模块和provider模块pom.xml 2.4 定义服务接口2.5 定义服务端的实现2.6 配置服务端 Yaml 配置文件2.7 配置消费端 Yaml 配置文件2.8 基于 Spring 配置服务端…

芯片原厂工程师带你学 Linux 驱动

芯片原厂工程师&#xff0c;手把手带你学Linux驱动&#xff0c;感兴趣的点个关注私聊呀。 介绍&#xff1a; https://b2qtatgfkp.feishu.cn/docx/HoBKdezVFo6HlVx0hXPc8R7QnWc

Mysql sql_mode=only_full_group_by时如何实现根据单个字段分组

SELECT* FROMTABLE_NAME t group by device_id ORDER BY create_time desc报错 Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column TABLE_NAME.t.id which is not functionally dependent on columns in GROUP BY clause; this is …

JINGWHALE 数字认证体系 · 进阶知识库

JINGWHALE 数字认证体系 是 JINGWHALE 数字科学艺术创新中心 的数字认证服务。 ◢◤ 宗旨 致力于数字化知行合一的知识赋能&#xff01; ◥ 数字化人才培养 培养数字化思维&#xff0c;传播数字化知识&#xff0c;赋能各行业数字化。 ◥ 职业人才发展 无缝衔接学校高等…

LeetCode题目104: 二叉树的最大深度(递归\迭代\层序遍历\尾递归优化\分治法实现 )

❤️❤️❤️ 欢迎来到我的博客。希望您能在这里找到既有价值又有趣的内容&#xff0c;和我一起探索、学习和成长。欢迎评论区畅所欲言、享受知识的乐趣&#xff01; 推荐&#xff1a;数据分析螺丝钉的首页 格物致知 终身学习 期待您的关注 导航&#xff1a; LeetCode解锁100…

百度地图提供的微信小程序的功能少到让我想跳楼

呜呜呜。 为什么uni-app使用vue-baidu-map然后启动在微信开发者工具就会有一大堆不明所以的报错啊

EPICS database练习

给定一个以下的数据库&#xff1a; # 指定Limit的上限&#xff0c;初始为10&#xff0c;可以通过通道访问进行设置&#xff0c;上限为100 record(ao, "$(P)Limit") { field(DRVH, "100") field(DOL, "10") field(PINI, "YES") }# 一个…

el-menu 保持展开点击不收缩 默认选择第一个菜单

<el-menu:default-openeds"[/system]" 数组 默认展开第一个:collapse"isCollapse"close"handleClose" 点击关闭的时候 让菜单打开 就可以实现保持展开效果ref"menus":unique-opened"true":active-text-color"se…

自媒体探索

很多做自媒体的群体说最重要的两点就是&#xff1a;学习和坚持 因为保持热情和创意的持续输出时间是很难的事情。 基于自媒体调研有两点建议&#xff1a; 以自己的专业/职业/兴趣为主&#xff0c;这样你会有相对稳定的输出。 以短视频为主的抖音/快手/视频号&#xff0c;还有…

2024中国(重庆)人工智能展览会8月举办

2024中国(重庆)人工智能展览会8月举办 邀请函 主办单位&#xff1a; 中国航空学会 重庆市南岸区人民政府 招商执行单位&#xff1a; 重庆港华展览有限公司 【报名I59交易会 2351交易会 9466】 展会背景&#xff1a; 2024中国航空科普大会暨第八届全国青少年无人机大赛在…

QT 项目打包(为了后期远程实验用)

一、环境准备 1、一个项目工程 二、步骤 1、将编译器设置调整为Release模式 二、对项目重新编译构建 三、可以看到工程目录这个文件夹 打开工程目录文件夹的Release文件夹&#xff0c;我的路径如下 四、新建一个文件夹&#xff0c;将上述路径文件夹下的exe文件复制到新的文…

Windows的消息过程调用与窗口位于同一个线程

消息过程函数和窗口通常在同一个线程中运行。 在Windows中&#xff0c;每个窗口都有一个与之相关联的线程&#xff0c;这个线程负责处理窗口的消息。当窗口接收到消息时&#xff0c;系统会将消息发送给创建窗口的线程&#xff0c;并在该线程上调用窗口过程函数来处理消息。 这…

领导跳槽邀请,测试员该如何抉择?

在职场中&#xff0c;领导跳槽并邀请下属一同前往新公司&#xff0c;是一个既常见又令人纠结的选择。对于测试员来说&#xff0c;这个决定更是充满了未知与风险。那么&#xff0c;面对这样的机会&#xff0c;我们该如何权衡利弊&#xff0c;做出明智的选择呢&#xff1f; 首先&…

你眼中的IT行业现状与未来趋势

一&#xff1a;阐述 现在IT、科技行业从业人员开始求稳&#xff0c;部分从业人员开始转向DBA、运维&#xff08;企业相当稳定&#xff09;、硬件工程师等&#xff08;技术过硬&#xff0c;不是随便可以转的&#xff09;&#xff0c;但是这些行业职位少&#xff0c;薪水相对不是…