SparkStreaming与Kafka整合

1.3 SparkStreaming与Kafka整合

1.3.1 整合简述
kafka是做消息的缓存,数据和业务隔离操作的消息队列,而sparkstreaming是一款准实时流式计算框架,所以二者的整合,是大势所趋。
​
二者的整合,有主要的两大版本。

kafka作为一个实时的分布式消息队列,实时的生产和消费消息,在实际开发中Spark Streaming经常会结合Kafka来处理实时数据。Spark Streaming 与 kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10。jar包分支选择原则:

  • 0.10.0>kafka版本>=0.8.2.1,选择 08 接口

  • kafka版本>=0.10.0,选择 010 接口

sparkStreaming和Kafka整合一般两种方式:Receiver方式和Direct方式

Receiver方式(介绍)

Receiver方式基于kafka的高级消费者API实现(高级优点:高级API写起来简单;不需要去自行去管理offset,系统通过zookeeper自行管理;不需要管理分区,副本等情况,系统自动管理;消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据;高级缺点:不能自行控制 offset;不能细化控制如分区、副本、zk 等)。Receiver从kafka接收数据,存储在Executor中,Spark Streaming 定时生成任务来处理数据。

默认配置的情况,Receiver失败时有可能丢失数据。如果要保证数据的可靠性,需要开启预写式日志,简称WAL(Write Ahead Logs,Spark1.2引入),只有接收到的数据被持久化之后才会去更新Kafka中的消费位移。接收到的数据和WAL存储位置信息被可靠地存储,如果期间出现故障,这些信息被用来从错误中恢复,并继续处理数据。

还有几个需要注意的点:

  • 在Receiver的方式中,Spark中的 partition 和 kafka 中的 partition 并不是相关的,如果加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度;

  • 对于不同的 Group 和 Topic 可以使用多个 Receiver 创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream;

  • 如果启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是:KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

  • WAL将接收的数据备份到HDFS上,保证了数据的安全性。但写HDFS比较消耗性能,另外要在备份完数据之后还要写相关的元数据信息,这样总体上增加job的执行时间,增加了任务执行时间;

  • 总体上看 Receiver 方式,不适于生产环境;

1.3.2  Direct的方式
Direct方式从Spark1.3开始引入的,通过 KafkaUtils.createDirectStream 方法创建一个DStream对象,Direct方式的结构如下图所示。

Direct 方式特点如下:

  • 对应Kafka的版本 0.8.2.1+

  • Direct 方式

  • Offset 可自定义

  • 使用kafka低阶API

  • 底层实现为KafkaRDD

该方式中Kafka的一个分区与Spark RDD对应,通过定期扫描所订阅Kafka每个主题的每个分区的最新偏移量以确定当前批处理数据偏移范围。与Receiver方式相比,Direct方式不需要维护一份WAL数据,由Spark Streaming程序自己控制位移的处理,通常通过检查点机制处理消费位移,这样可以保证Kafka中的数据只会被Spark拉取一次

  • 引入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.1.2</version>
</dependency>
  • 模拟kafka生产数据

package com.qianfeng.sparkstreaming
​
import java.util.{Properties, Random}
​
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
​
/*** 向kafka中test主题模拟生产数据;;;也可以使用命令行生产:kafka-console-producer.sh --broker-list qianfeng01:9092,hadoop02:9092,hadoop03:9092 -topic test*/
object Demo02_DataLoad2Kafka {def main(args: Array[String]): Unit = {val prop = new Properties()//提供Kafka服务器信息prop.put("bootstrap.servers","qianfeng01:9092")//指定响应的方式prop.put("acks","all")//请求失败重试的次数prop.put("retries","3")//指定key的序列化方式,key是用于存放数据对应的offsetprop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")//指定value的序列化方式prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")//创建producer对象val producer = new KafkaProducer[String,String](prop)//提供一个数组,数组中数据val arr = Array("hello tom","hello jerry","hello dabao","hello zhangsan","hello lisi","hello wangwu",)//提供一个随机数,随机获取数组中数据向kafka中进行发送存储val r = new Random()while(true){val message = arr(r.nextInt(arr.length))producer.send(new ProducerRecord[String,String]("test",message))Thread.sleep(r.nextInt(1000))   //休眠1s以内}}
}
  • 实时消费kafka数据

package com.qianfeng.sparkstreaming
​
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
​
/*** sparkStreaming消费Kafka中的数据*/
object Demo03_SparkStreamingWithKafka {def main(args: Array[String]): Unit = {//1.创建SparkConf对象val conf = new SparkConf().setAppName("SparkStreamingToKafka").setMaster("local[*]")//2.提供批次时间val time = Seconds(5)//3.提供StreamingContext对象val sc = new StreamingContext(conf, time)//4.提供Kafka配置参数val kafkaConfig = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "qianfeng01:9092",ConsumerConfig.GROUP_ID_CONFIG -> "qianfeng","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",)//5.读取Kafka中数据信息生成DStreamval value = KafkaUtils.createDirectStream(sc,//本地化策略:将Kafka的分区数据均匀的分配到各个执行Executor中LocationStrategies.PreferConsistent,//表示要从使用kafka进行消费【offset谁来管理,从那个位置开始消费数据】ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaConfig))//6.将每条消息kv获取出来val line: DStream[String] = value.map(record => record.value())//7.开始计算操作line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()//line.count().print()   //每隔5s的数据条数//8.开始任务sc.start()sc.awaitTermination()}
}
  • 说明

    1. 简化的并行性:不需要创建多个输入Kafka流并将其合并。 使用directStream,Spark Streaming将创建与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分区之间有一对一的映射关系。

    2. 效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是效率低下的,因为数据被有效地复制了两次:一次是Kafka,另一次是由预先写入日志(WriteAhead Log)复制。这个第二种方法消除了这个问题,因为没有接收器,因此不需要预先写入日志。只要Kafka数据保留时间足够长。

    3. 正好一次(Exactly-once)的语义:第一种方法使用Kafka的高级API来在Zookeeper中存储消耗的偏移量。传统上这是从Kafka消费数据的方式。虽然这种方法(结合提前写入日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会消费两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper/Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障的情况下有效地收到一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/582911.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM 常用知识和面试题

1. 什么是JVM内存结构&#xff1f; jvm将虚拟机分为5大区域&#xff0c;程序计数器、虚拟机栈、本地方法栈、java堆、方法区&#xff1b; 程序计数器&#xff1a;线程私有的&#xff0c;是一块很小的内存空间&#xff0c;作为当前线程的行号指示器&#xff0c;用于记录当前虚拟…

前端子项目共用node_modules

项目目录结构如下 首先按上面的结构新建三个项目&#xff0c;有一定前端经验的都知道怎么处理&#xff0c;我就不多介绍了。 1&#xff0c;子项目1 package.json如下&#xff0c;我只安装了vue index.js如下 2&#xff0c;子项目2 package.json如下&#xff0c;我安装了…

uniapp 底部导航栏 tabBar

在 static 文件夹中新建文件夹 tabBar&#xff0c;放入标签图片 源素材如下&#xff1a; 在 pages.json 中添加 // 底部导航"tabBar": {// tab默认文字颜色"color": "#bfbfbf",// tab选中后的文字颜色"selectedColor": "#153c65&…

华为鸿蒙应用--登录页:网络请求、自定义Loading、MD5密码加密、emitter订阅状态变化、持久化登录状态、隐藏软键盘-ArkTs

HarmonyOS系列 华为鸿蒙应用--底部导航栏Tabs&#xff08;自适应手机和平板&#xff09;-ArkTs_华为鸿蒙应用 csdn 底部导航栏-CSDN博客 华为鸿蒙应用--欢迎页SplashPage倒计时跳过&#xff08;自适应手机和平板&#xff09;-ArkTs_app.media.ic_splash_page_background-CSDN…

语言模型:从n-gram到神经网络的演进

目录 1 前言2 语言模型的两个任务2.1 自然语言理解2.2 自然语言生成 3 n-gram模型4 神经网络语言模型5 结语 1 前言 语言模型是自然语言处理领域中的关键技术之一&#xff0c;它致力于理解和生成人类语言。从最初的n-gram模型到如今基于神经网络的深度学习模型&#xff0c;语言…

Apache OFBiz RCE漏洞复现(CVE-2023-51467)

0x01 产品简介 Apache OFBiz是一个电子商务平台,用于构建大中型企业级、跨平台、跨数据库、跨应用服务器的多层、分布式电子商务类应用系统。 0x02 漏洞概述 漏洞成因 该系统的身份验证机制存在缺陷,可能允许未授权用户通过绕过标准登录流程来获取后台访问权限。此外,在…

Zabbix“专家坐诊”第221期问答汇总

问题一 Q&#xff1a;使用官方docker模板Template App Docker&#xff0c;监控docker镜像&#xff0c;有一项监控项docker.data_usage有报错&#xff0c;不知道哪里问题&#xff1a;Cannot fetch data: Get “http://1.28/system/df”: context deadline exceeded (Client.Time…

【MATLAB】交叉验证求光滑因子的广义神经网络时序预测算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 交叉验证求光滑因子的广义神经网络时序预测算法的基本原理如下&#xff1a; 首先&#xff0c;我们需要了解什么是交叉验证和光滑因子。交叉验证是一种评估模型性能的常用方法&#xff0c…

RK3568平台开发系列讲解(Linux系统篇)PWM系统编程

🚀返回专栏总目录 文章目录 一、什么是PWM二、PWM相关节点三、PWM应用编程沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将介绍 PWM 的系统编程。 一、什么是PWM PWM,即脉冲宽度调制(Pulse Width Modulation)

服务器Ubuntu系统安装

Ubuntu系统安装 系统下载制作系统盘1、下载系统盘制作软件2、制作启动盘 系统安装1、选择U盘启动2、安装系统 安装向日葵1、下载地址2、配置wayland 系统下载 https://mirrors.ustc.edu.cn/ubuntu-releases/22.04/ 推荐使用&#xff1a; 制作系统盘 1、下载系统盘制作软件 …

边缘智能网关在智慧大棚上的应用突破物联网大关

边缘智能网关在智慧大棚上的应用&#xff0c;是现代农业技术的一大突破。通过与农作物生长模型的结合&#xff0c;边缘智能网关可以根据实时的环境数据和历史数据&#xff0c;预测农作物的生长趋势和产量&#xff0c;提供决策支持和优化方案。这对于农民来说&#xff0c;不仅可…

使用 Django 的异步特性提升 I/O 类操作的性能

目录 一、引言 二、Django 的异步特性 三、提升 I/O 类操作的性能 四、示例代码 五、总结 一、引言 Django 是一个高级的 Python Web 框架&#xff0c;它以快速开发和简洁的代码而闻名。然而&#xff0c;对于一些 I/O 密集型的应用程序&#xff0c;Django 的同步特性可能…

单字符检测模型charnet使用方法,极简

Git链接 安装按照上面的说明&#xff0c;说下使用。 把tools下面的test做了一点修改&#xff0c;可以读取一张图片&#xff0c;把里面的单个字符都检测和识别出来。 然后绘制到屏幕上。 import torch from charnet.modeling.model import CharNet import cv2, os import num…

群晖Synology Office如何多人同时远程编辑同个文件

文章目录 本教程解决的问题是&#xff1a;1. 本地环境配置2. 制作本地分享链接3. 制作公网访问链接4. 公网ip地址访问您的分享相册5. 制作固定公网访问链接 本教程解决的问题是&#xff1a; 1.Word&#xff0c;PPT&#xff0c;Excel等重要文件存在本地环境&#xff0c;如何在编…

像美团一样商家入驻的小程序功能

美团一样的商家入驻小程序可以促进本地化商家的线上线下融合&#xff0c;为本地商家和用户提供更好的服务和体验&#xff0c;是一种数字化转型和创新&#xff0c;想要开发像美团一样的商家入驻小程序&#xff0c;需要具备以下功能&#xff1a; 1、不同行业独立频道 为本地化的…

任务和内存的栈

任务是什么&#xff1f; 任务是可以运行着的函数&#xff0c;本身并不是函数&#xff0c;因为任务是可以创建、删除、切换等操作的。 void add_val(int *pa, int *pb) {volatile int tmp;tmp *pa;*pa tmp *pb; }void TaskFunction(void *param) {int a 1;int b 2;add_va…

gitlab请求合并分支

直接去看原文: 原文链接:Gitlab合并请求相关流程_source branch target branch-CSDN博客 --------------------------------------------------------------------------------------------------------------------------------- 入口&#xff1a; 仓库控制台的这两个地方都…

Android集成OpenSSL实现加解密-编译

下载 OpenSSL 源码&#xff1a; 前往 OpenSSL 官方网站&#xff08;https://www.openssl.org/source/&#xff09;下载最新的源码压缩包并解压&#xff0c;示例在WSL环境编译 下载NDK 前往https://developer.android.google.cn/ndk/downloads?hlzh-cn下载NDK版本并解压 配置…

OCP NVME SSD规范解读-3.NVMe管理命令-part1

4.4 NVMe Admin Command Set章节详细介绍了设备应支持的NVMe管理命令集&#xff0c;包括必需的和可选的命令。以下是一些关键要求和描述&#xff1a; NVMe-AD-2&#xff1a;识别命令除了支持所有必需的CNS值和相关的必需字段外&#xff0c;还应支持以下可选字段&#xff1a; 格…

电子设计从零开始(2)-----走进电子技术之电阻器

同学们大家好&#xff0c;今天我们继续学习杨欣的《电子设计从零开始》&#xff0c;这本书从基本原理出发&#xff0c;知识点遍及无线电通讯、仪器设计、三极管电路、集成电路、传感器、数字电路基础、单片机及应用实例&#xff0c;可以说是全面系统地介绍了电子设计所需的知识…