spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

目录

1. RDD队列

2 textFileStream

3 DIY采集器

4 kafka数据源【重点】


1. RDD队列

       a、使用场景:测试
       b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")val ssc = new StreamingContext(sparkconf,Seconds(3))// 创建一个队列对象,队列中存放的是RDDval queue = new mutable.Queue[RDD[String]]()// 通过队列创建DStreamval queueDS: InputDStream[String] = ssc.queueStream(queue)queueDS.print()// 启动采集器ssc.start()//这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产for(i <- 1 to 5 ){// 循环创建rddval rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))// 将RDD存放到队列中queue.enqueue(rdd)// 当前线程休眠1秒Thread.sleep(6000)         }// 等待采集器的结束ssc.awaitTermination()}

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")val ssc = new StreamingContext(sparkConf,Seconds(3))//从文件中读取数据val textDS: DStream[String] = ssc.textFileStream("in")textDS.print()// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()

3 DIY采集器

    1. 自定义采集器
    2. 什么情况下需要自定采集器呢?
         比如从mysql、hbase中读取数据。
         采集器的作用是从指定的地方,按照采集周期对数据进行采集。
         目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
    3. 自定义采集器的步骤,模仿socketTextStream
         a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
         b、重写onStart和onStop方法
            onStart:采集器的如何启动
            onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")val ssc = new StreamingContext(sparkConf, Seconds(3))// 获取采集的流val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))ds.print()ssc.start()ssc.awaitTermination()}// 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var socket: Socket = _def receive = {// 获取输入流val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))// 设定一个间接变量var s: String = nullwhile (true) {// 按行读取数据s = reader.readLine()if (s != null) {// 将数据进行封装store(s)}}}// 1. 启动采集器override def onStart(): Unit = {socket = new Socket(host, port)new Thread("Socket Receiver") {setDaemon(true)override def run() {receive}}.start()}// 2. 停止采集器override def onStop(): Unit = {socket.close()socket = null}}

4 kafka数据源【重点】

-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法

 // TODO Spark环境// SparkStreaming使用核数最少是2个val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf, Seconds(3))// TODO 使用SparkStreaming读取Kafka的数据// Kafka的配置信息val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))// 获取数据,key是null,value是真实的数据val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()// 等待采集器的结束ssc.awaitTermination()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/110933.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2000-2022年上市公司CEO 高管及董事会环保背景数据(5W+ )(原始数据+处理代码Stata do文档)

2000-2022年上市公司CEO 高管及董事会环保背景数据&#xff08;5W &#xff09;&#xff08;原始数据处理代码Stata do文档&#xff09; 1、时间&#xff1a;2000-2022年 2、指标&#xff1a;证券代码、股票代码、年份、股票简称、ST或PT为1&#xff0c;否则为0、金融业为1&a…

【操作系统】线程的实现方式:用户线程和内核线程

1 用户级线程 完全在用户空间中实现和管理的线程。 它们的创建、同步和调度由应用程序通过用户级别的线程库实现&#xff0c;所有的线程管理工作都由应用程序负责&#xff0c;无需操作系统内核干预。在用户看来有多个线程&#xff0c;但操作系统并不能意识到线程的存在。 缺点…

Android apkanalyzer简介

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览三、用法3.1 使用 Android Studio3.1.1…

VR全景广告:让消费者体验沉浸式交互,让营销更有趣

好的产品都是需要广告宣传的&#xff0c;随着科技的不断发展&#xff0c;市面上的广告也和多年前的传统广告不同&#xff0c;通过VR技术&#xff0c;可以让广告的观赏性以及科技感更加强烈&#xff0c;并且相比于视频广告&#xff0c;成本也更低。 在广告营销中&#xff0c;关键…

深度解析自动化测试流程(纯干货)

最近很多小伙伴咨询自动化测试到底该怎么做&#xff1f;流程是什么样的&#xff1f;在每个阶段都需要注意什么&#xff1f;本文也就主要从自动化测试的基本流程入手&#xff0c;对面试自动化测试工程师的同学会有不少帮助。对于在职的朋友&#xff0c;也可以参考此流程&#xf…

Java集合类

Java集合类 集合类 集合类其实就是为了更好地组织、管理和操作我们的数据而存在的&#xff0c;包括列表、集合、队列、映射等数据结构。 集合根接口 Java中已经帮我们将常用的集合类型都实现好了&#xff0c;我们只需要直接拿来用就行了 所有的集合类最终都是实现自集合根…

exsi7.0 重新创建缺少的虚拟机磁盘文件文件(.vmdk)

数据存储浏览器中首发的虚拟机磁盘文件是虚拟机的界面文件&#xff0c;并且没有图标。 打开虚拟机电源时&#xff0c;出现文件未找到的错误。 查看虚拟机目录时存在平面文件 虚拟机磁盘的磁盘文件不存在或已损坏。 解决方案 如何重新补发丢失/丢失的虚拟磁盘光盘文件(.vmdk)&…

什么是客户端?一文了解客户端定义、特点与功能、搭建方法

客户端&#xff1a;定义、特点与功能、搭建方法 1. 定义&#xff1a; 客户端是计算机网络中的一个术语&#xff0c;指的是在网络通信中充当主动发起请求并接收服务响应的一方。通常&#xff0c;客户端是指运行在终端设备上的软件或硬件实体&#xff0c;通过与服务器进行通信来…

华为数通方向HCIP-DataCom H12-831题库(单选题:301-310)

第301题 关于配置防火墙安全区域的安全级别的描述,错误的是 A、同一系统中,两个安全区域不允许配置相同的安全级别 B、只能为自定义的安全区域设定安全级别 C、安全级别一旦设定不允许更改 D、新建的安全区域,系统默认其安全级别为1 答案:D 解析: 新创建的安全区域缺省未…

交通目标检测-行人车辆检测流量计数 - 计算机竞赛

文章目录 0 前言1\. 目标检测概况1.1 什么是目标检测&#xff1f;1.2 发展阶段 2\. 行人检测2.1 行人检测简介2.2 行人检测技术难点2.3 行人检测实现效果2.4 关键代码-训练过程 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 毕业设计…

GAN.py

原代码地址&#xff1a;github.com/zqhang/MTGFLOW 目录 def ConvEncoder() def ConvDecoder() class CNNAE(torch.nn.Module): class R_Net(torch.nn.Module): class D_Net(torch.nn.Module): def R_Loss&#xff08;&#xff09; def D_Loss&#xff08;&#xff09…

在前端html页面中向服务器发送post登录请求

目录 前言 搭建服务器 搭建前端登录页面 获取表单值 使用axios发送post登录请求 前言 一般在html页面中向服务器发送post请求的模块为登录请求&#xff0c;本文将介绍如何向服务器发送post请求 搭建服务器 如何搭建服务器请看JWT认证这篇文章&#xff0c;有详细的解说。…

SpringCloud学习笔记-gateway网关自定义全局过滤器

需求&#xff1a;定义全局过滤器&#xff0c;拦截请求&#xff0c;判断请求的参数是否满足下面条件&#xff1a; 参数中是否有authorization&#xff0c; authorization参数值是否为admin 如果同时满足则放行&#xff0c;否则拦截 实现&#xff1a; 在gateway中定义一个过…

《SQLi-Labs》04. Less 23~28a

title: 《SQLi-Labs》04. Less 23~28a date: 2023-10-19 19:37:40 updated: 2023-10-19 19:38:40 categories: WriteUp&#xff1a;Security-Lab excerpt: 联合注入&#xff0c;注释符过滤绕过之构造闭合&#xff0c;%00 截断、二次注入、报错注入&#xff0c;空格过滤绕过&…

【Java基础面试二十四】、String类有哪些方法?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;String类有哪些方法&…

诊断DLL——Visual Studio安装与dll使用

文章目录 Visual Studio安装一、DLL简介二、使用步骤1.新建VS DLL工程2.生成dll文件3.自定义函数然后新建一个function.h文件,声明这个函数。4.新建VS C++ console工程,动态引用DLL编写代码,调用dll三、extern "C" __declspec(dllexport)总结Visual Studio安装 官…

欧科云链研究院:人类或将成为仅次于AI第二聪明物种?Web3不允许

出品&#xff5c;欧科云链研究院 在 AI行业“掘金买铲”的英伟达&#xff0c;60%的红杉投资在AI相关领域&#xff0c;之前只专注Web3的顶级VC&#xff0c;Paradigm 正在从转向人工智能等 "前沿 "技术。 资本的追逐让AI迷人且危险。 OKG RESEARCH IN FT AI教父Geoffre…

并发容器(Map、List、Set)实战及其原理

一. JUC包下的并发容器 Java的集合容器框架中&#xff0c;主要有四大类别&#xff1a;List、Set、Queue、Map&#xff0c;大家熟知的这些集合类ArrayList、LinkedList、HashMap这些容器都是非线程安全的。 所以&#xff0c;Java先提供了同步容器供用户使用。 同步容器可以简单地…

生成“我的精彩回答”页面源码(Python)

生成“我的精彩回答”页面源码(Python)

单点登录知识点

单点登录&#xff08;Single Sign-On&#xff0c;SSO&#xff09;是一种身份验证技术&#xff0c;用户只需进行一次认证&#xff0c;便可访问多个与该系统相关的应用程序。单点登录的实现方式有很多种&#xff0c;如以下几种&#xff1a; 1. 基于代理服务器的实现&#xff1a;…