Scala-SparkStreaming 2.2.0 消费 kafka0.10(生产1.0)

Scala-SparkStreaming 2.2.0 kafka0.10(生产1.0)

文章目录

  • Scala-SparkStreaming 2.2.0 kafka0.10(生产1.0)
      • 代码
      • Pom.xml
    • Sparkstreaming 2.1.1版本
    • pom文件

Spark 2.2 kafka0.10(api使用的0.10,实际生产kafka版本是1.0)

代码

package com.jast.testimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.JavaConversions._
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkKafkaTest3 {// ZK clientval client = {val client = CuratorFrameworkFactory.builder.connectString("10.86.8.118:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3))// .namespace("mykafka").build()client.start()client}// offset 路径起始位置val Globe_kafkaOffsetPath = "/kafka/offsets"// 路径确认函数  确认ZK中路径存在,不存在则创建该路径def ensureZKPathExists(path: String)={if (client.checkExists().forPath(path) == null) {client.create().creatingParentsIfNeeded().forPath(path)}}// 保存 新的 offsetdef storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {for (o <- offsetRange){val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}"ensureZKPathExists(zkPath)// 向对应分区第一次写入或者更新Offset 信息println("---Offset写入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)println("保存路径:"+zkPath);client.setData().forPath(zkPath, o.untilOffset.toString.getBytes())println("写入成功")}}def getFromOffset(topic: Array[String], groupName:String):(Map[TopicPartition, Long], Int) = {// Kafka 0.8和0.10的版本差别,0.10 为 TopicPartition   0.8 TopicAndPartitionvar fromOffset: Map[TopicPartition, Long] = Map()val topic1 = topic(0).toString// 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstreamval zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}"// 检查路径是否存在ensureZKPathExists(zkTopicPath)// 获取topic的子节点,即 分区val childrens = client.getChildren().forPath(zkTopicPath)// 遍历分区val offSets: mutable.Buffer[(TopicPartition, Long)] = for {p <- childrens}yield {// 遍历读取子节点中的数据:即 offsetval offsetData = client.getData().forPath(s"$zkTopicPath/$p")// 将offset转为Longval offSet = java.lang.Long.valueOf(new String(offsetData)).toLong// 返回  (TopicPartition, Long)(new TopicPartition(topic1, Integer.parseInt(p)), offSet)}println(offSets.toMap)if(offSets.isEmpty){(offSets.toMap, 0)} else {(offSets.toMap, 1)}}//    if (client.checkExists().forPath(zkTopicPath) == null){////      (null, 0)//    }//    else {//      val data = client.getData.forPath(zkTopicPath)//      println("----------offset info")//      println(data)//      println(data(0))//      println(data(1))//      val offSets = Map(new TopicPartition(topic1, 0) -> 7332.toLong)//      println(offSets)//      (offSets, 1)//    }////  }def createMyZookeeperDirectKafkaStream(ssc:StreamingContext, kafkaParams:Map[String, Object], topic:Array[String],groupName:String ):InputDStream[ConsumerRecord[String, String]] = {// get offset  flag = 1  表示基于已有的offset计算  flag = 表示从头开始(最早或者最新,根据Kafka配置)val (fromOffsets, flag) = getFromOffset(topic, groupName)var kafkaStream:InputDStream[ConsumerRecord[String, String]] = nullif (flag == 1){// 加上消息头//val messageHandler = (mmd:MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())println(fromOffsets)kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffsets))println(fromOffsets)println("中断后 Streaming 成功!")} else {kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(topic, kafkaParams))println("首次 Streaming 成功!")}kafkaStream}def main(args: Array[String]): Unit = {val processInterval = 5 //处理间隔时间val brokers = "10.86.8.153:9092"val topics = Array("all_spider_data")val conf = new SparkConf().setMaster(args(0)).setAppName("kafka checkpoint zookeeper")// kafka paramsval kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "zk_group","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val ssc = new StreamingContext(conf, Seconds(processInterval))val messages = createMyZookeeperDirectKafkaStream(ssc, kafkaParams, topics, "zk_group")messages.foreachRDD((rdd) => {if (!rdd.isEmpty()){println("###################:"+rdd.count())}// 存储新的offsetstoreOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "zk_group")})ssc.start()ssc.awaitTermination()}}

Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhonghong</groupId><artifactId>spark-test</artifactId><version>1.0-SNAPSHOT</version><properties><spark.version>2.2.0</spark.version><scala.version>2.11.8</scala.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><!--  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.8.2.0</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version><exclusions><exclusion><artifactId>scala-library</artifactId><groupId>org.scala-lang</groupId></exclusion></exclusions></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.14</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.0</version></dependency><!-- <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency>--></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix><mainClass></mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><executions><execution><id>copy</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin></plugins></build></project>

Sparkstreaming 2.1.1版本

package com.zsh.spark.streamingimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentobject KafkaSparkStreaming2 {def main(args: Array[String]) {val conf =new SparkConf().setAppName("data").setMaster("local[2]")val ssc = new StreamingContext(conf, Seconds(2))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.2.112:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "sss","auto.offset.reset" -> "earliest",//	"enable.auto.commit" -> (false: java.lang.Boolean)  是否自动提交offset"enable.auto.commit" -> (true: java.lang.Boolean),"auto.commit.interval.ms" -> (60*1000+"")//每隔60s自动提交一次)val topics = Array("test","weixin_for_check")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.foreachRDD(rdd => {if (rdd.count() >= 1) {rdd.map(record => (record.key,record.offset(),record.value,record.value,record.value)).foreach(println)}})//				stream.map(record => (record.key, record.value))ssc.start()ssc.awaitTermination()}}

pom文件

<dependency>  <groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>2.1.1</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><!-- <version>1.6.2</version> --><version>2.1.1</version>
</dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509527.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构前缀,后缀,中缀表达式

[cpp] view plaincopy [cpp] view plaincopy <span style"color: rgb(51, 51, 51); font-family: Arial; font-size: 14px; line-height: 26px; background-color: rgb(255, 255, 255);">举例&#xff1a;</span> (3 4) 5 - 6 就是中缀表达式 - 3…

hdu1232畅通路程(并查集)

参考博客&#xff1a;https://blog.csdn.net/blue_skyrim/article/details/50178287 畅通工程 Time Limit: 4000/2000 MS (Java/Others) Memory Limit: 65536/32768 K (Java/Others) Total Submission(s): 62854 Accepted Submission(s): 33623 Problem Description 某省调…

gcc的简单使用教程

前几天在学习嵌入式入门时,有一个视频中就是介绍gcc的使用的,看了视频后突然好 想将GCC的手册页翻译出来,后来看到手册页发现实在太多了,凭我个人的能力根本无 法完成,只能写一些自己使用Gcc时的一些常规使用方法. GCC是GNU的成员之一,原意是GNU的C语言编译器,后来发展到不只能…

SparkStreaming参数介绍

SparkStreaming参数介绍 spark.streaming.concurrentJobs :增加job并行度 可以通过集中方法为streaming job配置此参数。 - spark-default中修改 全局性修改&#xff0c;所有的streaming job都会受到影响。 - 提交streaming job是 –conf 参数添加&#xff08;推荐&#x…

还是畅通工程(克鲁斯卡尔算法+并查集)

还是畅通工程 Time Limit: 4000/2000 MS (Java/Others) Memory Limit: 65536/32768 K (Java/Others) Total Submission(s): 53997 Accepted Submission(s): 24504 Problem Description 某省调查乡村交通状况&#xff0c;得到的统计表中列出了任意两村庄间的距离。省政府“畅…

makefile深度学习(一个工程实例来学习 Makefile)

转自 http://www.cnblogs.com/OpenShiFt/p/4313351.html?utm_sourcetuicool&utm_mediumreferral Makefile 文件的编写 学习前的准备 需要准备的工程目录结构如下&#xff1a; . ├── add │ ├── add_float.c │ ├── add.h │ └── add_int.c ├── main…

Spark算子介绍

Spark算子 文章目录Spark算子一、转换算子coalesce函数repartition函数flatMap——flatMap变换sample——抽样zip——联结mapValues——对Value值进行变换二、行动Action算子数据运算类行动算子reduce——Reduce操作collect——收集元素countByKey——按Key值统计Key/Value型RD…

数据结构实验之二叉树六:哈夫曼编码

题目描述 字符的编码方式有多种&#xff0c;除了大家熟悉的ASCII编码&#xff0c;哈夫曼编码(Huffman Coding)也是一种编码方式&#xff0c;它是可变字长编码。该方法完全依据字符出现概率来构造出平均长度最短的编码&#xff0c;称之为最优编码。哈夫曼编码常被用于数据文件压…

hdu3790最短路径问题 (Dijkstra算法)

最短路径问题 Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others) Total Submission(s): 32544 Accepted Submission(s): 9565Problem Description给你n个点&#xff0c;m条无向边&#xff0c;每条边都有长度d和花费p&#xff0c;给你起…

spark master web ui 端口8080被占用解决方法

spark master web ui 端口8080被占用解决方法 Spark master web ui 默认端口为8080&#xff0c;当系统有其它程序也在使用该接口时&#xff0c;启动master时也不会报错&#xff0c;spark自己会改用其它端口&#xff0c;自动端口号加1&#xff0c;但为了可以控制到指定的端口&a…

GDB调试工具使用教程(博客)

http://blog.csdn.net/haoel/article/details/2879

树-堆结构练习——合并果子之哈夫曼树

题目描述 在一个果园里&#xff0c;多多已经将所有的果子打了下来&#xff0c;而且按果子的不同种类分成了不同的堆。多多决定把所有的果子合成一堆。 每一次合并&#xff0c;多多可以把两堆果子合并到一起&#xff0c;消耗的体力等于两堆果子的重量之和。可以看出&#xff0c;…

DataFrame函数介绍

DataFrame函数 文章目录DataFrame函数DataFrame 的函数Action 操作dataframe的基本操作集成查询DataFrame 的函数 Action 操作 collect() ,返回值是一个数组&#xff0c;返回dataframe集合所有的行 collectAsList() 返回值是一个java类型的数组&#xff0c;返回dataframe集合…

GCC编译器和GDB调试器常用选项

GCC编译器 gcc hello.c -o hello #将hello.c编译成hello可执行文件 gcc -E hello.c -o hello.i #将hello.c 转换成预处理后的文件hello.igcc -S hello.c -o hello.S #将hello.c 转换成汇编文件 hello.Sgcc -c hello.c -o hello.…

树结构练习——判断给定森林中有多少棵树

题目描述 众人皆知&#xff0c;在编程领域中&#xff0c;C是一门非常重要的语言&#xff0c;不仅仅因为其强大的功能&#xff0c;还因为它是很多其他面向对象语言的祖先和典范。不过这世上几乎没什么东西是完美的&#xff0c;C也不例外&#xff0c;多继承结构在带来强大功能的同…

Spark RDD分区2G限制

Spark RDD分区2G限制 文章目录Spark RDD分区2G限制问题现象解决方法为什么2G限制个人思&#xff08;yu&#xff09;考&#xff08;jian&#xff09;问题现象 遇到这个问题时&#xff0c;spark日志会报如下的日志 片段1&#xff1a; 15/04/16 14:13:03 WARN scheduler.TaskSe…

hdu3790最短路径问题(迪杰斯特拉算法+详解+代码)

最短路径问题 Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others) Total Submission(s): 32544 Accepted Submission(s): 9565Problem Description给你n个点&#xff0c;m条无向边&#xff0c;每条边都有长度d和花费p&#xff0c;给你起…

T型知识结构

传统的知识结构&#xff0c;即仅有某一专业知识的结构。这是惟一的知识结构&#xff0c;或称线性结构。这种知识结构已远远不能适应形势对管理者的要求。新型的人才知识结构通常可分为三角形、宝塔形、衣架型、T型、H型、X型等。前三个类型一般是指专业技术人才&#xff0c;在某…

priority_queueint,vectorint,greaterint优先队列,按照从小到大

原网址&#xff1a; 优先队列 C优先队列的基本使用方法 在优先队列中&#xff0c;优先级高的元素先出队列。 标准库默认使用元素类型的<操作符来确定它们之间的优先级关系。 优先队列的第一种用法&#xff0c;也是最常用的用法&#xff1a; priority_queue<int>qi;通…

Spark stage如何划分

窄依赖和宽依赖 窄依赖&#xff1a; 指父RDD的每一个分区最多被一个子RDD的分区所用&#xff0c;表现为一个父RDD的分区对应于一个子RDD的分区&#xff0c;和两个父RDD的分区对应于一个子RDD 的分区。图中&#xff0c;map/filter和union属于第一类&#xff0c;对输入进行协同…