2023_Spark_实验三十二:消费Kafka数据并保存到MySQL中

实验目的:掌握Scala开发工具消费Kafka数据,并将结果保存到关系型数据库中

实验方法:消费Kafka数据保存到MySQL中

实验步骤:

一、创建Job_ClickData_Process

代码如下:

package examsimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, DriverManager, PreparedStatement}
import scala.collection.mutable/*** @projectName sparkGNU2023  * @package exams  * @className exams.Job_ClickData_Process  * @description ${description}  * @author pblh123* @date 2023/12/20 15:42* @version 1.0**/object Job_ClickData_Process {def main(args: Array[String]): Unit = {//  1. 创建spark,sc,sparkstreaming对象if (args.length != 3) {println("您需要输入三个参数")System.exit(5)}val musrl: String = args(0)val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster(musrl)val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")val ckeckpointdir: String = args(1)val ssc = new StreamingContext(sc, Seconds(5)) //连续流批次处理的大小//  2. 代码主体
//    设置ckeckpoint目录ssc.checkpoint(ckeckpointdir)//准备kafka的连接参数val kfkbst: String = args(2)val kafkaParams: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> kfkbst,"group.id" -> "SparkKafka",//latest表示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,就从最新/或最后的位置开始消费//earliest表示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,就从最开始/最早的位置开始消费//none示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,则报错"auto.offset.reset" -> "latest", //偏移量的重置位置"enable.auto.commit" -> (false: java.lang.Boolean), //是否自动提交偏移量"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer])val topics: Array[String] = Array("RealDataTopic")//从mysql中查询出offsets:Map[TopicPartition, Long]val offsetsMap: mutable.Map[TopicPartition, Long] = OffsetUtils.getOffsetMap("SparkKafka", "RealDataTopic")val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if (offsetsMap.size > 0) {println("MySql记录了offset信息,从offset处开始消费")//连接kafka的消息KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetsMap))} else {println("MySql没有记录了offset信息,从latest处开始消费")//连接kafka的消息KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))}//实时处理数据并手动维护offsetval valueDS = kafkaDS.map(_.value()) //_表示从kafka中消费出来的每一条数据valueDS.print()kafkaDS.map(_.value())valueDS.foreachRDD(rdd => {rdd.foreachPartition(lines => {//将处理分析的结果存入mysql/*DROP TABLE IF EXISTS `job_real_time`;CREATE TABLE `job_real_time` (`datetime` varchar(8) DEFAULT NULL COMMENT '日期',`job_type` int(2) DEFAULT NULL COMMENT '1代表新招聘岗位,0代表找工作的人',`job_id` int(8) DEFAULT NULL COMMENT '岗位ID,匹配岗位名称',`count` int(8) DEFAULT NULL COMMENT '企业新增岗位数和找工作的人数') ENGINE=InnoDB DEFAULT CHARSET=utf8;*///1.开启连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.137.110:3306/bigdata19?characterEncoding=UTF-8&serverTimezone=UTC", "lh", "Lh123456!")//2.编写sql并获取psval sql: String = "replace into job_real_time(datetime,job_type,job_id,count) values(?,?,?,?)"val ps: PreparedStatement = conn.prepareStatement(sql)//3.设置参数并执行for (line <- lines) {var item = line.split(" ")ps.setString(1, item(0).toString)ps.setInt(2, item(1).toInt)ps.setInt(3, item(2).toInt)ps.setInt(4, item(3).toInt)ps.executeUpdate()}//4.关闭资源ps.close()conn.close()})})//手动提交偏移量kafkaDS.foreachRDD(rdd => {if (rdd.count() > 0) {//获取偏移量val offsets: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesOffsetUtils.saveOffsets(groupId = "SparkKafka", offsets)}})//开启sparkstreaming任务并等待结束,关闭ssc,scssc.start()ssc.awaitTermination()ssc.stop()sc.stop()}}

二、编写模拟点击量并消费Kafka数据

启动zookeeper集群

zk.sh start

启动kafka集群

kf.sh start

检查模拟的实时数据是否正常更新

不断正常更新的情况下,启动flume采集real-time-data.log的实时数据

启动flume

在mysql数据库中准备偏移表与实时数据表

启动Job_ClickData_Process方法消费kafka数据并保存到mysql中


 

检查mysql表是否存入数据

实验结果:通过scala开发spark代码实现消费kafka数据存储到MySQL中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/235352.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据挖掘体系介绍

数据挖掘是什么&#xff1f; 简而言之&#xff0c;对数据进行挖掘&#xff0c;从中提取出有效的信息。一般我们会把这种信息通过概念、规则、规律、模式等有组织的方式展示出来&#xff0c;形成所谓的知识。特别是在这个大数据时代&#xff0c;当数据多到一定程度&#xff0c;…

【Docker-4】Docker 命令

1、镜像管理命令 docker images #查看本机镜像 [rootdocker-0001 ~]# docker imagesdocker search 镜像名称 #从官方仓库查找镜像 [rootdocker-0001 ~]# docker search busybox #需要联网&#xff0c;本次不用操作docker pull 镜像名称:标签 #下载镜像 [rootdocke…

Qt前端技术:2.QSS

border-style&#xff1a;后边是两个参数的话第一个参数改变上下的style 第二个参数改变左右的style 如果后边是三个参数的话第一个参数改变上边的style第二个参数改变左右的style&#xff0c;第三个参数改变的下边的style 如果后边是四个参数的话对应的顺序为上&#xff0c;右…

掌控时间的尝试:基于Flask的自卷系统设计与实现

Gitee源代码仓库&#xff1a;Strong: 一个自卷系统 (gitee.com) 长期苦于自己的时间如脱缰野马&#xff0c;难以掌控&#xff0c;无法投入到我认为自己想做的事情中去。纯粹的自律实在不可靠&#xff0c;我希望借助一些外力来帮助自己挣脱泥潭&#xff0c;于是我制作了这个实验…

Spring MVC控制层框架

三、Spring MVC控制层框架 目录 一、SpringMVC简介和体验 1. 介绍2. 主要作用3. 核心组件和调用流程理解4. 快速体验 二、SpringMVC接收数据 1. 访问路径设置2. 接收参数&#xff08;重点&#xff09; 2.1 param 和 json参数比较2.2 param参数接收2.3 路径 参数接收2.4 json参…

旅游景区项目信息化建设运营方案:PPT47页,附下载

关键词&#xff1a;智慧景区解决方案&#xff0c;智慧景区建设&#xff0c;智慧景区开发与管理&#xff0c;智慧景区建设的意义&#xff0c;智慧景区管理 一、旅游景区项目信息化建设背景 1、旅游业发展迅速&#xff1a;随着旅游业的不断发展&#xff0c;游客对旅游体验的需求…

Flink(十)【处理函数】

前言 冬天学习成本太高了&#xff0c;每天冻得要死&#xff0c;自习室人满为患&#xff0c;确实是辛苦。学校基本的硬件条件差的一批&#xff08;图书馆贼小贼偏僻、老教室暖气还没有地板热、空教室还得自己一个一个挨着找&#xff09;&#xff0c;个体无法改变环境只能顺应了&…

【ARM Trace32(劳特巴赫) 高级篇 21 -- Trace 系统性能分析 Performance Analyzer】

请阅读【Trace32 ARM 专栏导读】 文章目录 Performance AnalyzerPerf 操作步骤采样对象PC采样对象Memory采样对象 TaskPerformance Analyzer sample-based profiling 通常也叫做Trace32 的性能分析(Perf), 这个功能是通过周期性的采样来实现的。被采样到的数据可以被用于统计…

Apache Flink(十七):Flink On Standalone任务提交-Standalone Application模式

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

flink sql1.18.0连接SASL_PLAINTEXT认证的kafka3.3.1

阅读此文默认读者对docker、docker-compose有一定了解。 环境 docker-compose运行了一个jobmanager、一个taskmanager和一个sql-client。 如下&#xff1a; version: "2.2" services:jobmanager:image: flink:1.18.0-scala_2.12container_name: jobmanagerports:…

基于RocketMQ实现分布式事务

前言 在上一篇文章Spring Boot自动装配原理以及实践我们完成了服务通用日志监控组件的开发&#xff0c;确保每个服务都可以基于一个注解实现业务功能的监控。 而本文我们尝试基于RocketMQ实现下单的分布式的事务。可能会有读者会有疑问&#xff0c;之前我们不是基于Seata完成了…

AIGC:阿里开源大模型通义千问部署与实战

1 引言 通义千问-7B&#xff08;Qwen-7B&#xff09;是阿里云研发的通义千问大模型系列的70亿参数规模的模型。Qwen-7B是基于Transformer的大语言模型, 在超大规模的预训练数据上进行训练得到。预训练数据类型多样&#xff0c;覆盖广泛&#xff0c;包括大量网络文本、专业书籍…

百度侯震宇:AI原生与大模型将从三个层面重构云计算

12月20日&#xff0c;2023百度云智大会智算大会在北京举办&#xff0c;大会以「大模型重构云计算&#xff0c;Cloud for AI」为主题&#xff0c;深度聚焦大模型引发的云计算变革。 百度智能云表示&#xff0c;为满足大模型落地需求&#xff0c;正在基于「云智一体」战略重构…

ubuntu qt 源码编译

官方源码下载地址 : 源码地址 选择要下载的版本 dmg结尾的是MacOS系统里使用的Qt库&#xff0c;qt-everywhere-opensource-src-4.7.0是Qt源码包&#xff0c;有zip和tar.gz两个压缩格式的&#xff0c;两个内容是一样的&#xff0c;只是zip一般在Windows下比较流行&#xff0c;…

Java:语法速通

参考 菜鸟教程 java 继承 class 父类 { }class 子类 extends 父类 { }继承的特性&#xff1a; 子类拥有父类非private的属性和方法子类可以对父类进行扩展子类可以重写父类的方法使用extends只能单继承&#xff0c;使用implements可以变相的多继承&#xff0c;即一个类继承…

无人机支持的空中无蜂窝大规模MIMO系统中上行链路分布式检测

无人机支持的空中无蜂窝大规模MIMO系统中上行链路分布式检测 无人机支持的空中无蜂窝大规模MIMO系统中上行链路分布式检测介绍题目一. 背景&#xff08;解决的问题&#xff09;二. 系统模型2.1 信道模型2.1.1 信道系数2.1.2 进行标准化 2.2 信道估计 和 数据传输2.2.1 信道估计…

在Windows系统平台下部署运行服务端Idea工程的jar服务

前言 目前云原生docker等技术&#xff0c;加上部署流水线大大的简化了各种流程&#xff0c;我们后端开发的人员只需要提交代码后&#xff0c;构建、部署、测试、发布等环节都无需人员接入&#xff0c;完全的自动化交付了。那么你肯定不禁想问&#xff0c;如题的需求不是点击一…

Web 前端—HTML+CSS系列

HTML、CSS 一、HTMLCSS1.1什么是HTML、CSS1.2宇宙第一编辑器VS Code1.3Chrome浏览器1.4、深入了解网站开发 一、HTML基本操作1.web前端三大核心技术2.HTML初始代码3.HTML注释4.HTML语义化5.标题与段落6.文本修饰标签7.图片标签与图片属性8.引入文件的地址路径9.跳转链接10.跳转…

Leetcode—75.颜色分类【中等】

2023每日刷题&#xff08;六十五&#xff09; Leetcode—75.颜色分类 实现代码 class Solution { public:void sortColors(vector<int>& nums) {int red 0, white 0, blue 0;for(auto num: nums) {if(num 0) {red;} else if(num 1) {white;} else {blue;}}for…

机械、电气、自动化与人工智能融合:发展历程、问题与前景

导言 机械、电气、自动化行业与人工智能的结合&#xff0c;推动了工业革命的新浪潮。本文将深入研究这一融合的发展历程、遇到的问题、解决过程&#xff0c;以及未来的可用范围&#xff0c;着重分析在各国的应用现状和未来的研究趋势。同时&#xff0c;探讨在哪些方面能够取得胜…