SparkStreaming Kafka 自动保存offset到zookeeper

SparkStreaming Kafka 自动保存offset到zookeeper

场景

spark使用的是1.6,SparkStreaming1.6时候使用的kafka jar包为0.8的,消费时候不记录消费到的信息,导致重复消费,故手动保存到zookeeper,SparkStreaming2.1.1时使用的kafka jar包为0.10,没有出现这种状况,以下是1.6版本的消费

package com.zsh.spark.streamingimport org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtilsimport kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.ZKGroupTopicDirs
import org.apache.spark.streaming.kafka.OffsetRange
import org.apache.spark.streaming.kafka.HasOffsetRanges
import kafka.utils.ZkUtils
import kafka.consumer.SimpleConsumer
import kafka.api.TopicMetadataRequest
import kafka.api.PartitionOffsetRequestInfo
import kafka.api.OffsetRequest
import java.util.Properties
import java.io.FileInputStreamobject Kafka2Es {def main(args: Array[String]) {val properties = new Properties()
//				val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPathval path = Kafka2Es.getClass.getResourceAsStream("/config.properties")properties.load(path)val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")//此处在idea中运行时请保证local[2]核心数大于2sprakConf.setMaster("local[4]")val ssc = new StreamingContext(sprakConf, Seconds(3))val brokers = properties.getProperty("kafka.brokers") //kafka地址ip:port,ip2:portval zookeeper = properties.getProperty("zookeeper.node") //zookeeper地址 ip:port,ip2:port2val kfkHost = brokers.split(",")(0).split(":")(0) val kfkPort = brokers.split(",")(0).split(":")(1).toIntval topics = properties.getProperty("kafka.consumer.topic") val groupId = properties.getProperty("kafka.consumer.group")val topicSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,//				    "bootstrap.servers" -> brokers,//						"key.deserializer" -> "org.apache.kafka.common.serialization.StringSerializer",//						"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",//						"value.deserializer" -> "org.apache.kafka.common.serialization.StringSerializer","group.id" -> groupId,"serializer.class" -> "kafka.serializer.StringEncoder","auto.offset.reset" -> "smallest"//	"enable.auto.commit" -> (false: java.lang.Boolean)  是否自动提交offset//						"enable.auto.commit" -> "true",//						"client.id" -> "ssss",//						"auto.commit.interval.ms" -> (6*1000+"")//每隔60s自动提交一次)val topicDirs = new ZKGroupTopicDirs(groupId, topics)  //创建一个 ZKGroupTopicDirs 对象,对保存val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_nameprintln("Group Offset在zookeeper路径为:"+zkTopicPath)val zkClient = new ZkClient(zookeeper)//连接Zookeeperval children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")//查询该路径下是否子节点(默认有子节点为我们自己保存不同 partition 时生成的)println("children size is "+children)var kafkaStream : InputDStream[(String, String)] = null var fromOffsets: Map[TopicAndPartition, Long] = Map()  //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置if (children > 0) { //如果保存过 offset,这里更好的做法,还应该和  kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误//					for (i <- 0 to children-1) {//						val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")//						val tp = TopicAndPartition(topics, i);//						fromOffsets += (tp -> partitionOffset.toLong)  //将不同 partition 对应的 offset 增加到 fromOffsets 中//						println("@@@@@@ topic[" + topics + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")//					}val partitions = getPartitionLeader(topics, kfkHost, kfkPort) //获取每个partition的leader,然后取每个partition中的最小值,与zookeeper保存的最小值比较,如果zookeeper保存的比partition最小值小则使用partition的值partitions.foreach(x=>{val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${x._1}")//x._1是partitions(map)的key  x._2是valueval tp = TopicAndPartition(topics, x._1)val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))val consumerMin = new SimpleConsumer(x._2.toString(), 9092, 10000, 10000, "getMinOffset")  //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsetsprintln("最低:"+curOffsets.head)var nextOffset = partitionOffset.toLongif (curOffsets.length > 0 && nextOffset < curOffsets.head) {  // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择nextOffset = curOffsets.head}fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的想想println("ZshfromOffsets:"+fromOffsets)})val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())  //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuplekafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)}else {kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset}var offsetRanges = Array[OffsetRange]()kafkaStream.transform{ rdd =>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offsetrdd}.foreachRDD { rdd =>for (o <- offsetRanges) {val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)  //将该 partition 的 offset 保存到 zookeeperprintln(s"@@@@@@ topic  ${o.topic}  partition ${o.partition}  fromoffset ${o.fromOffset}  untiloffset ${o.untilOffset} #######")}rdd.foreachPartition(message => {while(message.hasNext) {val value=message.next()._2.toStringprintln(s"@^_^@   [" + value + "] @^_^@")}})//      println ("Zsh:"+rdd.)rdd.map(record=>(record._1+"%%%%"+record._2)).foreach(println)}/**8* 从指定位置开始读取kakfa数据* 注意:由于Exactly  Once的机制,所以任何情况下,数据只会被消费一次!*      指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据*///				val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L),(topics, 3, 0L)) //				val fromOffsets = setFromOffsets(offsetList)//				val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message())//				val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String,String)](ssc, kafkaParams, fromOffsets,messageHandler )//				messages.foreachRDD(//						mess => {//							//获取offset集合//							val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges//									mess.foreachPartition(lines => {//										lines.map(s=>s._1+"!!!!"+s._2).foreach(println)//										//        lines.foreach(line => {//										//          val o: OffsetRange = offsetsList(TaskContext.get.partitionId)//										//          println("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")//										//          println(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}  ${o.untilOffset} ")//										//          println("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")//										//          println("The kafka  line is " + line)//										//        })//									})//						}//						)//				messages.print()//				val lines = messages.map(_._2).map(s=>s+":pipade")//				lines.print()ssc.start()ssc.awaitTermination()}//构建Mapdef setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {var fromOffsets: Map[TopicAndPartition, Long] = Map()for (offset <- list) {val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数fromOffsets += (tp -> offset._3)           // offset位置}fromOffsets}def getPartitionLeader(topic :String,kfkHost :String,kfkPort :Int):	Map[Int, String]={//	  	val topic_name = "test0920"     //topic_name 表示我们希望获取的 topic 名字val topic2 = List(topic)       val req = new TopicMetadataRequest(topic2, 0)val getLeaderConsumer = new SimpleConsumer(kfkHost, kfkPort, 10000, 10000, "OffsetLookup")  // 第一个参数是 kafka broker 的host,第二个是 portval res = getLeaderConsumer.send(req)val topicMetaOption = res.topicsMetadata.headOptionval partitions = topicMetaOption match {case Some(tm) =>tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]  // 将结果转化为 partition -> leader 的映射关系case None =>Map[Int, String]()}partitions}}

pom.xml

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>1.6.2</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.6.2</version><exclusions><exclusion><artifactId>scala-library</artifactId><groupId>org.scala-lang</groupId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>1.6.2</version><!-- <version>2.1.1</version> -->
</dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509501.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构实验之查找一:二叉排序树

题目描述 对应给定的一个序列可以唯一确定一棵二叉排序树。然而&#xff0c;一棵给定的二叉排序树却可以由多种不同的序列得到。例如分别按照序列{3,1,4}和{3,4,1}插入初始为空的二叉排序树&#xff0c;都得到一样的结果。你的任务书对于输入的各种序列&#xff0c;判断它们是否…

GCC常用参数详解

简介 gcc and g现在是gnu中最主要和最流行的c & c编译器 .gcc/g在执行编译工作的时候&#xff0c;总共需要以下几步: 1.预处理,生成.i的文件[预处理器cpp] 2.将预处理后的文件不转换成汇编语言,生成文件.s[编译器egcs] 3.有汇编变为目标代码(机器代码)生成.o的文件[汇编器a…

使用反射调用构造器创建对象

构造器最大的作用:创建对象. 为什么使用反射创建对象,为什么不直接来new呢? 在框架中,提供给我们的都是字符串. ----------------------------------------------------------- 使用反射创建对象: 步骤: 1);找到构造器所在类的字节码对象. 2):获取构造器对象. 3):使用反射…

数据结构实验之查找三:树的种类统计

题目描述 随着卫星成像技术的应用&#xff0c;自然资源研究机构可以识别每一个棵树的种类。请编写程序帮助研究人员统计每种树的数量&#xff0c;计算每种树占总数的百分比。 输入 输入一组测试数据。数据的第1行给出一个正整数N (n < 100000)&#xff0c;N表示树的数量&…

java中如何使用反射调用方法以及获得类中的属性

使用反射获取类中的方法: 1):获取方法所在类的字节码对象. 2):获取方法. ------------------------------------------------------------------------ Class类中常用方法: publicMethod[] getMethods():获取包括自身和继承过来的所有的public方法 publicMethod[] getDeclaredM…

Maxwell读取MySQL数据

文章目录Maxwell 概述1.1 Maxwell 定义1.2 Maxwell工作原理1.2.1 MySQL主从复制过程1.2.2 Maxwell的工作原理1.2.3 **MySQL** **的** binlog1.3 Maxwell和Canal的对比Maxwell使用2.1 Maxwell安装2.1.1 安装地址2.1.2 安装部署2.1.3 MySQL环境准备2.1.4 初始化Maxwell元数据库2.…

数据结构实验之查找二:平衡二叉树

题目描述 根据给定的输入序列建立一棵平衡二叉树&#xff0c;求出建立的平衡二叉树的树根。 输入 输入一组测试数据。数据的第1行给出一个正整数N(n < 20)&#xff0c;N表示输入序列的元素个数&#xff1b;第2行给出N个正整数&#xff0c;按数据给定顺序建立平衡二叉树。 输…

Linux-(C/C++)动态链接库生成以及使用(libxxx.so)

Linux中so文件为共享库&#xff0c;与windows下dll类似&#xff0c;不过实现要简单。 so可以供多个进程使用&#xff0c;不同进程调用同一个so文件&#xff0c;所使用so文件不同。 so文件源程序不需要main函数&#xff0c;有也不会被执行。 下面通过一个简单例子&#xff0c;来…

数据结构实验图论一:基于邻接矩阵的广度优先搜索遍历

题目描述 给定一个无向连通图&#xff0c;顶点编号从0到n-1&#xff0c;用广度优先搜索(BFS)遍历&#xff0c;输出从某个顶点出发的遍历序列。(同一个结点的同层邻接点&#xff0c;节点编号小的优先遍历&#xff09;输入 输入第一行为整数n&#xff08;0< n <100&#xf…

IO对象流(序列化和反序列化)

序列化和反序列化概念&#xff1a; 序列化: 把堆内存中的java对象数据&#xff0c;通过某种方式把对象存储到磁盘文件中或者传递给其他网络的节点&#xff08;在网络上传输&#xff09; 反序列化: 把磁盘文件中的对象数据或者网络节点上的对象数据&#xff0c;恢复成java对象的…

我是怎么招聘程序员的

http://coolshell.cn/articles/1870.html

数据结构实验之图论二:基于邻接表的广度优先搜索遍历

题目描述 给定一个无向连通图&#xff0c;顶点编号从0到n-1&#xff0c;用广度优先搜索(BFS)遍历&#xff0c;输出从某个顶点出发的遍历序列。(同一个结点的同层邻接点&#xff0c;节点编号小的优先遍历&#xff09;输入 输入第一行为整数n&#xff08;0< n <100&#xf…

IO之打印流

打印流,打印数据的,打印流只能是输出流: PrintStream: 字节打印流 PrintWriter: 字符打印流 -对于PrintWriter来说,当启用字段刷新之后, 调用println或者printf或者format方法,便会立马刷新操作. 如果没有开启自动刷新,则需要手动刷新或者当缓冲区满的时候,再自动刷新. 使…

数据结构实验之查找四:二分查找

题目描述 在一个给定的无重复元素的递增序列里&#xff0c;查找与给定关键字相同的元素&#xff0c;若存在则输出找到的位置,不存在输出-1。 输入 一组输入数据&#xff0c;输入数据第一行首先输入两个正整数n ( n < 10^6 )和m ( m < 10^4 )&#xff0c;n是数组中数据元…

橡皮鸭程序调试法

转自&#xff1a;http://write.blog.csdn.net/postedit 面&#xff0c;让我来为你介绍一个程序调试大法——“橡皮鸭程序调试法”&#xff0c;这个方法在调试界是很出众的&#xff0c;实施起来相当方便和简易&#xff0c;几乎可以随时随地地实验&#xff0c;几乎不需要借助任何…

标准IO概述和操作

标准的IO: 标准的输入: 通过键盘录入数据给程序. 标准的输出: 在屏幕上显示程序数据. 在System类中有两个常量: InputStream in System.in; PrintStream out System.out; 标准流的重定向操作: 标准的输入: 通过键盘录入数据给程序. 重新指定输入的源不再是键盘,而是一个…

十条不错的编程观点

转自&#xff1a;http://coolshell.cn/articles/2424.html 在Stack Overflow上有这样的一个贴子《What’s your most controversial programming opinion?》&#xff0c;翻译成中文就是“你认为最有争议的编程观点是什么&#xff1f;”&#xff0c;不过&#xff0c;在400多个主…

数据结构上机实验之二分查找

题目描述 在一个递增的序列里&#xff0c;查找元素是否存在&#xff0c;若存在输出YES,不存在输出NO.输入 本题多组数据&#xff0c;首先输入一个数字n(n>100000)&#xff0c;然后输入n个数&#xff0c;数据保证数列递增&#xff0c;然后再输入一个查找数字。输出 若存在输出…

IO之 Properties类加载文件

配置文件:资源文件(以.properties作为拓展名的文件)/属性文件: 做项目开发,为何使用配置文件? 把所有的数据存储在代码中,写死了,”硬编码”. 比如:在Java中需要连接数据库,必须拥有数据的账号和密码. 此时我们就得在Java代码中编写,类似的代码: String username”root”…

Makefile系列学习(博客)

http://blog.csdn.net/haoel/article/category/9198/3