sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现。


1、Spark Streaming接入Kafka方式介绍

Spark Streaming 官方提供了两种方式读取Kafka数据:

  1. 一是Receiver-based Approach。该种读取模式官方最先支持,并在Spark 1.2提供了数据零丢失(zero-data loss)的支持;

  2. 一是Direct Approach (No Receivers)。该种读取方式在Spark 1.3引入。

1.1 Receiver-based Approach

Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图:

67702ffd747ead37eae665f8fec06fb1.png

  1. 需要借助Write Ahead Logs 来保证数据的不丢失,如果启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)

  2. 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度

  3. 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream

1.2 Direct Approach (No Receivers)

Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。由drive来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。其具体读取方式如下图:

3e4b0a03f1eb6a04b9ffb9a634b48a17.png

  1. 简化的并行:在Receiver的方式中提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。

  2. 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。

  3. 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

2、Spark Streaming接入Kafka数据实现

以wordcount统计为例,kafka生产端输入词组,Spark端读取kafka流数据,并统计词频

2.1 Receiver方式收取数据

1)Import KafkaUtils并创建DStream

from pyspark.streaming.kafka import KafkaUtils

kafkaStream = KafkaUtils.createStream(streamingContext, \
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
  1. ZK Quorum:Zookeeper quorum (hostname:port,hostname:port,..)

  2. Groupid:消费者的groupid

  3. Topics:{topic_name : numPartitions}

2)具体实现代码如下:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
#if len(sys.argv) != 3:
# print("Usage: kafka_wordcount.py ", file=sys.stderr)
# exit(-1)

sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 10)

zkQuorum = "192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181"
groupid = "spark-streaming-consumer"
topic = {"kafka_spark_test1":0,"kafka_spark_test1":1,"kafka_spark_test1":2}
#zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

在Spark目录执行命令:

spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py
2.2 Direct方式收取数据

1)Import KafkaUtils并创建DStream

from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
  • ssc:StreamingContext

  • topics:消费的topics清单

  • {"metadata.broker.list": brokers}:kafka参数,可以指定为 metadata.broker.list或bootstrap.servers

  • 默认情况下,从每个kafka分区的最新的offset进行消费,如果在kafka参数中设置了auto.offset.reset 为smallest,则会从最小的offset进行消费

  • 如果希望保存每个批量消费的kafka offset,可以进行如下操作:

offsetRanges = []

def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd

def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

directKafkaStream \
.transform(storeOffsetRanges) \
.foreachRDD(printOffsetRanges)

如果希望使用基于Zookeeper的Kafka监控,也可以通过这种方法展现Streaming的进程。

2)具体实现代码如下:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

offsetRanges = []

def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd

def printOffsetRanges(rdd):
for o in offsetRanges:
print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))

if __name__ == "__main__":
#if len(sys.argv) != 3:
# print("Usage: direct_kafka_wordcount.py ", file=sys.stderr)
# exit(-1)

sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 10)

#brokers, topic = sys.argv[1:]
topic="kafka_spark_test1"
brokers = "192.168.112.101:9092,192.168.112.102:9092,192.168.112.103:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
kvs.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
counts.pprint()

ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate

在Spark根目录执行命令:

spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py
2.3 Kafka生产者配置

Kafka集群环境的安装配置,参考之前的文档"大数据系列之Kafka集群环境部署"中相关内容

1)启动zookeeper

[root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh  ./config/zookeeper.properties &
[root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

2)启动Kafka集群

[root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh  ./config/server.properties &
[root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
[root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &

3)创建Kafka topic

[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --replication-factor 2 --partitions 3 --topic kafka_spark_test1
Created topic "kafka_spark_test1".

创建名为kafka_spark_test1 的Topic,复制因子设为2,同时分区数为3,注意,分区数是read parallelisms的最大值

4)查看Topic详情

[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --topic kafka_spark_test1
Topic:kafka_spark_test1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: kafka_spark_test1 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: kafka_spark_test1 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: kafka_spark_test1 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

指定--zookeeper选项的值为192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181,对应的Topic,即刚创建的kafka_spark_test1

2.4 Kafka-Spark Streaming流测试

1)下载依赖的jars包

31a90c07093d8e3ed63275c3d4b0f8e0.png

2)启动kafka生产者

[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test1

3)运行Spark Streaming流数据处理程序

[root@tango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py
[root@tango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py

4)在Kafka生产端输入流数据

[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test1
>hello world
>hello tango hello
>hello tango tango

5)终端打印结果

-------------------------------------------
Time: 2018-08-08 11:03:15
-------------------------------------------
(u'tango', 2)
(u'hello', 1)

465db79ecec5ab184f83ef034ac4a93b.png

6)登录SparkWeb UI,查看Spark Streaming的的运行情况

a) spark-submit时候指定spark-submit --master spark://192.168.112.121:7077才能在8080端口看到数据

5204816405c5738434957e90283af29c.png

f19c5eb3b72cd5122c779ecb6c60a51e.png

b) 如果通过yarn模式调度,可通过8088端口查看

745de24154a094ea190a012eedd97859.png

2.5 Spark写入Kafka

1)安装Kafka插件

Pyspark访问Kafka需要使用到kafka安装包,使用以下命令安装:

pip install --no-index --find-links=../kafka-1.3.5-py2.py3-none.any.whl kafka

2)调用KafkaProducer模块,spark作为生产者将数据传输到kafka端

from kafka import KafkaProducer

to_kafka = KafkaProducer(bootstrap_servers=broker_list)
to_kafka.send(topic_name,send_msg,encode(‘utf8’))
to_kafka.flush()

参考资料

  1. http://spark.apache.org/docs/latest/streaming-kafka-integration.html

  2. 大数据系列之Kafka集群环境部署

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/365316.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

时间服务器

时间服务器配置: 1 安装软件包: [rootlocalhost ~]# yum install ntp –y 2 修改配置文件 [rootlocalhost ~]# vim /etc/ntp.conf # 允许内网其他机器同步时间 192.168.1.0该网段 restrict 192.168.1.0 mask 255.255.255.0 nomodify notrap 允许任何ip的客户机都可以…

使用 HTML5 Canvas 绘制出惊艳的水滴效果

HTML5 在不久前正式成为推荐标准,标志着全新的 Web 时代已经来临。在众多 HTML5 特性中,Canvas 元素用于在网页上绘制图形,该元素标签强大之处在于可以直接在 HTML 上进行图形操作,具有极大的应用价值。 这里分享一个惊艳的 Canva…

mysql 字符串类型 char varchar

字符类型用在存储名字、邮箱地址、家庭住址等描述性数据 char指的是定长字符,varchar指的是变长字符 #官网:https://dev.mysql.com/doc/refman/5.7/en/char.html #注意:char和varchar括号内的参数指的都是字符的长度#char类型:定长…

二叉树遍历规则

树的遍历顺序大体分为三种:前序遍历(先根遍历、先序遍历),中序遍历(中根遍历),后序遍历(后根遍历)。 如图所示二叉树: 前序遍历:前序遍历可以记为…

php网页的注册界面设计,HTML开发博客之注册页面设计(一)

CSS文件的引入新建文件reg.html文件首先我们来分析网页布局这是我们页面完成后的效果,网页分为三部分头部,主体,和底部我们按照这个顺序开始编写。头部导航栏的编写html>用户注册页面首页科技资讯心情随笔资源收藏图文图片留言板登陆/注册…

Arctext.js - 基于 CSS3 jQuery 的文本弯曲效果

Arctext.js 是基于 Lettering.js 的文本旋转插件,根据设置的旋转半径准确计算每个字母的旋转弧度并均匀分布。虽然 CSS3 也能够实现字符旋转效果,但是要让安排每个字母都沿着弯曲路径排布相当的复杂,结合 Arctext.js 则可以轻松实现这个效果。…

在JDT中使用Java 8 Lambda

旧 Curmudgeon 认识Smalltalk的Dude 在修改Eclipse Java开发工具 (JDT)项目正在开发的Java 8支持时,我一直在使用这种语言。 我承认我对Java 8中的lambda有点不满意。 当然,这来自于知道Smalltalk (和LISP / Schem…

zabbix邮件告警

zabbix邮件告警调用第三方邮件服务来发送邮件。 [roottiandong ~]# yum install mailx -y 修改配置文件 [roottiandong ~]# vim /etc/mail.rc 67 set from15600857257163.com smtpsmtp.163.com 68 set smtp-auth-user15600857257163.com 69 set smtp-auth-password密码&#xf…

SweetAlert – 替代 Alert 的漂亮的提示效果

Sweet Alert 是一个替代传统的 JavaScript Alert 的漂亮提示效果。SweetAlert 自动居中对齐在页面中央,不管您使用的是台式电脑,手机或平板电脑看起来效果都很棒。另外提供了丰富的自定义配置选择,可以灵活控制。 在线演示 插件下载 您可…

onpagefinished等了很久才执行_今天自律了吗?停课不停锻炼 才是战疫正确姿势

近日,中青校媒面向全国915名高校学生发起关于“宅家运动”情况的调查,发现15.39%被调查者在家期间会严格执行锻炼计划,39.96%选择间歇性完成制订的运动目标,还有44.65%在家很少运动。(3月18日《中国青年报》)新冠肺炎疫情发生&…

Ocrad.js – JS 实现 OCR 光学字符识别

Ocrad.js 相当于是 Ocrad 项目的纯 JavaScript 版本,使用 Emscripten 自动转换。这是一个简单的 OCR (光学字符识别)程序,可以扫描图像中的文字回文本。 不像 GOCR.js,Ocrad.js 被设计成一个端口,而不是围绕…

Sequence.js 实现带有视差滚动特效的图片滑块

Sequence.js 功能齐全,除了能实现之前分享过的现代的图片滑动效果,还可以融合当前非常流行的视差滚动(Parallax Scrolling)效果。让多层背景以不同的速度移动,形成立体的运动效果,带来非常出色的视觉体验。…

开源JVM Sampling Profiler

众所周知 ,大多数现有的采样Java Profiler都必须在安全的地方进行堆栈跟踪收集。 诸如采样探查器之类的探查器就是这种情况,它使用SUN / Oracle管理代理来收集其堆栈跟踪。 这种方法的问题在于,由于不是程序中的每个点都不是安全点&#xff0…

雷霆战机

前言 多年前,你我在一起"打飞机"。为了实现真正的打飞机,在下一年前踏足帝都学习了无所不能的Python,辣么接下来带你在俩个小时用200行代码学会打飞机。 python中提供了一个pygame的模块能够让我们快速编写一个游戏。接下来&#x…

FancyBox - 经典的 jQuery Lightbox 插件

FancyBox 是一款非常优秀的弹窗插件,能够为图片、HTML 内容和其它任务的多媒体内容提供优雅的弹出缩放效果。作为是最流行的 Lightbox 插件之一,可以通过 fitToView 实现自适应功能。主要特色: ✓ 能够显示图片、HTML 元素、SWF 影片、ifra…

php如何表格中的变为超链接,php中将网址转换为超链接的函数

php中将网址转换为超链接的函数复制代码 代码如下:function showtext($text){$search array(|(http://[^ ])|, |(https://[^ ])|, |(www.[^ ])|);$replace array($1, $1, $1);$text preg_replace($search, $replace, $text);return $text;}时间: 2011-08-311.根据…

Java面试参考指南–第1部分

JAVA面向对象的概念 Java基于面向对象的概念,它允许更高级别的抽象以实际方式解决任何问题。 面向对象的方法将实际对象中的问题解决方案概念化,从而更易于在整个应用程序中重用。 例如椅子,风扇,狗,电脑等。 在Java…

Odyssey.js – 使用现成模板在线创建互动的故事

Odyssey.js 是一个开源工具,它可以让你的地图,叙述和其他多媒体结合成一个美丽的故事。创建新的故事很简单,要求无非是一个现代的 Web 浏览器和一个好的想法。你可以使用现成的模板来控制和设计精美的布局,让你的故事的整体外观和…

100层楼扔两个鸡蛋的问题

转载自:http://blog.sina.com.cn/s/blog_6c813dbd0101bh98.html 两个软硬程度一样但未知的鸡蛋,它们有可能都在一楼就摔碎,也可能从一百层楼摔下来没事。 有座100层的建筑,要你用这两个鸡蛋确定哪一层是鸡蛋可以安全落下的最高位置…

酷毙了!三种风格的全屏幻灯片效果【附源码下载】

今天,我们想向您展示如何创建平铺背景图像的幻灯片效果。其灵感来自于国外的一个工作室网站(围观),这个网站充满了各种有趣和创意效果,一定记得去看看。 这个幻灯片效果是由四个区域的独立移动构成,通过画面…