使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能,以及Kafka和HDFS作为我们的数据传输和存储工具。
1、环境设置:
首先,确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>spark_project</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12.12</scala.version><spark.version>3.2.0</spark.version><kafka.version>2.8.1</kafka.version></properties><dependencies><!-- Spark dependencies --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka dependencies --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><!-- Scala library --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency>   </dependencies>
</project>

mysql中表结构:
在这里插入图片描述

2、从MySQL读取数据到Kafka:
我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据,并将其转换为JSON格式,然后将数据写入到Kafka主题中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfsimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject Mysql2Kafka {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName("MySQLToKafka").master("local[*]").getOrCreate()// 设置 MySQL 连接属性val mysqlProps = new Properties()mysqlProps.setProperty("user", "root")mysqlProps.setProperty("password", "12345678")mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver")// 从 MySQL 数据库中读取数据val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/mydb", "comment", mysqlProps)// 将 DataFrame 转换为 JSON 字符串val jsonDF = jdbcDF.selectExpr("to_json(struct(*)) AS value")// 将数据写入 KafkajsonDF.show()jsonDF.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "comment").save()// 停止 SparkSessionspark.stop()}}

以上代码首先创建了一个SparkSession,然后设置了连接MySQL所需的属性。接着,它使用jdbc.read从MySQL数据库中读取数据,并将数据转换为JSON格式,最后将数据写入到名为"comment"的Kafka主题中。提示:topic主题会被自动创建。

从Kafka消费数据并写入HDFS:
接下来,我们将设置Spark Streaming来消费Kafka中的数据,并将数据保存到HDFS中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfsimport com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}case class Comment(author_name:String,fans:String,comment_text:String,comment_time:String,location:String,user_gender:String)object kafka2Hdfs {def main(args: Array[String]): Unit = {// 设置 SparkConfval sparkConf = new SparkConf().setAppName("KafkaToHDFS").setMaster("local[*]")// 创建 StreamingContext,每秒处理一次val ssc = new StreamingContext(sparkConf, Seconds(1))// 设置 Kafka 相关参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092", // Kafka broker 地址"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "spark-consumer-group", // Spark 消费者组"auto.offset.reset" -> "earliest", // 从最新的偏移量开始消费"enable.auto.commit" -> (false: java.lang.Boolean) // 不自动提交偏移量)// 设置要订阅的 Kafka 主题val topics = Array("comment")// 创建 Kafka Direct Streamval stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 从 Kafka 中读取消息,然后将其写入 HDFSstream.map({rdd=>val comment = JSON.parseObject(rdd.toString(), classOf[Comment])comment.author_name+","+comment.comment_text+","+comment.comment_time+","+comment.fans+","+comment.location+","+comment.user_gender}).foreachRDD { rdd =>if (!rdd.isEmpty()) {println(rdd)rdd.saveAsTextFile("hdfs://hadoop101:8020/tmp/")}}// 启动 Spark Streamingssc.start()ssc.awaitTermination()}}

以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象,并将其保存为逗号分隔的文本文件,最终存储在HDFS的/tmp目录中。
在这里插入图片描述

结论:
通过本文的介绍和示例代码,您现在应该了解如何使用Apache Spark构建一个实时数据流水线,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。

**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/11244.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode-有效的括号

20. 有效的括号 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/valid-parentheses/ 题目 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&…

c++ map,set封装

map 是一个 kv 结构&#xff0c; set 是 k结构。 我们前面模拟实现了 红黑树&#xff0c;但是我们实现的红黑树把 kv 结构写死了&#xff0c;怎么样才能用泛型编程的思想来实现map和set呢 我们先简单看一下原码中是怎么实现的 1.原码实现逻辑 我们打开这里的 stl_set.h 通过…

Dubbo基本使用

Dubbo基本使用 1.项目介绍2.开发步骤2.1 启动注册中心2.2 初始化项目2.3 添加 Maven 依赖2.3.1 父pom.xml2.3.1 consumer模块和provider模块pom.xml 2.4 定义服务接口2.5 定义服务端的实现2.6 配置服务端 Yaml 配置文件2.7 配置消费端 Yaml 配置文件2.8 基于 Spring 配置服务端…

芯片原厂工程师带你学 Linux 驱动

芯片原厂工程师&#xff0c;手把手带你学Linux驱动&#xff0c;感兴趣的点个关注私聊呀。 介绍&#xff1a; https://b2qtatgfkp.feishu.cn/docx/HoBKdezVFo6HlVx0hXPc8R7QnWc

JINGWHALE 数字认证体系 · 进阶知识库

JINGWHALE 数字认证体系 是 JINGWHALE 数字科学艺术创新中心 的数字认证服务。 ◢◤ 宗旨 致力于数字化知行合一的知识赋能&#xff01; ◥ 数字化人才培养 培养数字化思维&#xff0c;传播数字化知识&#xff0c;赋能各行业数字化。 ◥ 职业人才发展 无缝衔接学校高等…

LeetCode题目104: 二叉树的最大深度(递归\迭代\层序遍历\尾递归优化\分治法实现 )

❤️❤️❤️ 欢迎来到我的博客。希望您能在这里找到既有价值又有趣的内容&#xff0c;和我一起探索、学习和成长。欢迎评论区畅所欲言、享受知识的乐趣&#xff01; 推荐&#xff1a;数据分析螺丝钉的首页 格物致知 终身学习 期待您的关注 导航&#xff1a; LeetCode解锁100…

EPICS database练习

给定一个以下的数据库&#xff1a; # 指定Limit的上限&#xff0c;初始为10&#xff0c;可以通过通道访问进行设置&#xff0c;上限为100 record(ao, "$(P)Limit") { field(DRVH, "100") field(DOL, "10") field(PINI, "YES") }# 一个…

el-menu 保持展开点击不收缩 默认选择第一个菜单

<el-menu:default-openeds"[/system]" 数组 默认展开第一个:collapse"isCollapse"close"handleClose" 点击关闭的时候 让菜单打开 就可以实现保持展开效果ref"menus":unique-opened"true":active-text-color"se…

2024中国(重庆)人工智能展览会8月举办

2024中国(重庆)人工智能展览会8月举办 邀请函 主办单位&#xff1a; 中国航空学会 重庆市南岸区人民政府 招商执行单位&#xff1a; 重庆港华展览有限公司 【报名I59交易会 2351交易会 9466】 展会背景&#xff1a; 2024中国航空科普大会暨第八届全国青少年无人机大赛在…

QT 项目打包(为了后期远程实验用)

一、环境准备 1、一个项目工程 二、步骤 1、将编译器设置调整为Release模式 二、对项目重新编译构建 三、可以看到工程目录这个文件夹 打开工程目录文件夹的Release文件夹&#xff0c;我的路径如下 四、新建一个文件夹&#xff0c;将上述路径文件夹下的exe文件复制到新的文…

Windows的消息过程调用与窗口位于同一个线程

消息过程函数和窗口通常在同一个线程中运行。 在Windows中&#xff0c;每个窗口都有一个与之相关联的线程&#xff0c;这个线程负责处理窗口的消息。当窗口接收到消息时&#xff0c;系统会将消息发送给创建窗口的线程&#xff0c;并在该线程上调用窗口过程函数来处理消息。 这…

领导跳槽邀请,测试员该如何抉择?

在职场中&#xff0c;领导跳槽并邀请下属一同前往新公司&#xff0c;是一个既常见又令人纠结的选择。对于测试员来说&#xff0c;这个决定更是充满了未知与风险。那么&#xff0c;面对这样的机会&#xff0c;我们该如何权衡利弊&#xff0c;做出明智的选择呢&#xff1f; 首先&…

你眼中的IT行业现状与未来趋势

一&#xff1a;阐述 现在IT、科技行业从业人员开始求稳&#xff0c;部分从业人员开始转向DBA、运维&#xff08;企业相当稳定&#xff09;、硬件工程师等&#xff08;技术过硬&#xff0c;不是随便可以转的&#xff09;&#xff0c;但是这些行业职位少&#xff0c;薪水相对不是…

LLM大语言模型(十四):LangChain中Tool的不同定义方式,对prompt的影响

背景 ChatGLM3-6B的函数调用功能&#xff0c;和LangChain的Tool调用&#xff0c;在prompt上并没有对齐。 参考&#xff1a;LLM大语言模型&#xff08;十二&#xff09;&#xff1a;关于ChatGLM3-6B不兼容Langchain 的Function Call_error: valueerror: caught exception: unk…

神卓互联内网穿透之快速创建https类型通道【最新】

神卓互联最近上线了V9.0内网穿透通信传输模式&#xff0c;相比与之前的V8.0在速度和延迟方面确实提升了很多&#xff0c;控制台也进行了改版升级&#xff0c;这里是对升级后的控制台创建https通道方法进行记录&#xff1a; 登录神卓互联控制台 选择【内网穿透】-【映射管理】…

如何利用AI提高内容生产效率与AIGC典型案例分析

❤️❤️❤️ 欢迎来到我的博客。希望您能在这里找到既有价值又有趣的内容&#xff0c;和我一起探索、学习和成长。欢迎评论区畅所欲言、享受知识的乐趣&#xff01; 推荐&#xff1a;数据分析螺丝钉的首页 格物致知 终身学习 期待您的关注 导航&#xff1a; LeetCode解锁100…

有趣的css - 打字机动画效果

大家好&#xff0c;我是 Just&#xff0c;这里是「设计师工作日常」&#xff0c;今天分享的是使用 css 实现好玩的单行打字机效果&#xff0c;和我一起看看吧。 《有趣的css》系列最新实例通过公众号「设计师工作日常」发布。 目录 整体效果核心代码html 代码css 部分代码 完整…

centos7同步银河麒麟服务器SP系列外网yum源包同步不完整问题

centos7同步银河麒麟服务器SP系列外网yum源包同步不完整问题 一 问题描述二 解决方法三 外网源配置步骤 一 问题描述 Red 7.7 x86架构同步银河麒麟服务器SP arm架构外网源的yum包不完整问题&#xff0c;yum repolist查看源里面有15000左右的包&#xff0c;使用reposync命令同步…

疾病防范:拯救微笑,关于儿童抑郁的注意事项

引言&#xff1a; 儿童抑郁是一种常见但常被忽视的心理健康问题&#xff0c;对孩子的身心健康和成长都会造成严重影响。本文将探讨儿童抑郁的注意事项&#xff0c;以帮助家长和教育者更好地识别、理解和应对儿童抑郁问题。 1. 深入了解抑郁症&#xff1a; 抑郁症并非一种偶发的…

B端设计与C端设计,用户模型区别!

B端设计和C端设计到底有哪些不同&#xff1f;这篇文章里&#xff0c;作者就做了相对详细的阐述和分析&#xff0c;不妨来看一下。 C 全称是 Customer 即消费者&#xff08;泛指用户&#xff09;的产品&#xff0c;个人用户或终端用户&#xff0c;使用的是客户端。例如&#xff…