Scala-SparkStreaming 2.2.0 kafka0.10(生产1.0)
文章目录
- Scala-SparkStreaming 2.2.0 kafka0.10(生产1.0)
- 代码
- Pom.xml
- Sparkstreaming 2.1.1版本
- pom文件
Spark 2.2 kafka0.10(api使用的0.10,实际生产kafka版本是1.0)
代码
package com.jast.testimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.JavaConversions._
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkKafkaTest3 {// ZK clientval client = {val client = CuratorFrameworkFactory.builder.connectString("10.86.8.118:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3))// .namespace("mykafka").build()client.start()client}// offset 路径起始位置val Globe_kafkaOffsetPath = "/kafka/offsets"// 路径确认函数 确认ZK中路径存在,不存在则创建该路径def ensureZKPathExists(path: String)={if (client.checkExists().forPath(path) == null) {client.create().creatingParentsIfNeeded().forPath(path)}}// 保存 新的 offsetdef storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {for (o <- offsetRange){val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}"ensureZKPathExists(zkPath)// 向对应分区第一次写入或者更新Offset 信息println("---Offset写入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)println("保存路径:"+zkPath);client.setData().forPath(zkPath, o.untilOffset.toString.getBytes())println("写入成功")}}def getFromOffset(topic: Array[String], groupName:String):(Map[TopicPartition, Long], Int) = {// Kafka 0.8和0.10的版本差别,0.10 为 TopicPartition 0.8 TopicAndPartitionvar fromOffset: Map[TopicPartition, Long] = Map()val topic1 = topic(0).toString// 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstreamval zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}"// 检查路径是否存在ensureZKPathExists(zkTopicPath)// 获取topic的子节点,即 分区val childrens = client.getChildren().forPath(zkTopicPath)// 遍历分区val offSets: mutable.Buffer[(TopicPartition, Long)] = for {p <- childrens}yield {// 遍历读取子节点中的数据:即 offsetval offsetData = client.getData().forPath(s"$zkTopicPath/$p")// 将offset转为Longval offSet = java.lang.Long.valueOf(new String(offsetData)).toLong// 返回 (TopicPartition, Long)(new TopicPartition(topic1, Integer.parseInt(p)), offSet)}println(offSets.toMap)if(offSets.isEmpty){(offSets.toMap, 0)} else {(offSets.toMap, 1)}}// if (client.checkExists().forPath(zkTopicPath) == null){//// (null, 0)// }// else {// val data = client.getData.forPath(zkTopicPath)// println("----------offset info")// println(data)// println(data(0))// println(data(1))// val offSets = Map(new TopicPartition(topic1, 0) -> 7332.toLong)// println(offSets)// (offSets, 1)// }//// }def createMyZookeeperDirectKafkaStream(ssc:StreamingContext, kafkaParams:Map[String, Object], topic:Array[String],groupName:String ):InputDStream[ConsumerRecord[String, String]] = {// get offset flag = 1 表示基于已有的offset计算 flag = 表示从头开始(最早或者最新,根据Kafka配置)val (fromOffsets, flag) = getFromOffset(topic, groupName)var kafkaStream:InputDStream[ConsumerRecord[String, String]] = nullif (flag == 1){// 加上消息头//val messageHandler = (mmd:MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())println(fromOffsets)kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffsets))println(fromOffsets)println("中断后 Streaming 成功!")} else {kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(topic, kafkaParams))println("首次 Streaming 成功!")}kafkaStream}def main(args: Array[String]): Unit = {val processInterval = 5 //处理间隔时间val brokers = "10.86.8.153:9092"val topics = Array("all_spider_data")val conf = new SparkConf().setMaster(args(0)).setAppName("kafka checkpoint zookeeper")// kafka paramsval kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "zk_group","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val ssc = new StreamingContext(conf, Seconds(processInterval))val messages = createMyZookeeperDirectKafkaStream(ssc, kafkaParams, topics, "zk_group")messages.foreachRDD((rdd) => {if (!rdd.isEmpty()){println("###################:"+rdd.count())}// 存储新的offsetstoreOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "zk_group")})ssc.start()ssc.awaitTermination()}}
Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhonghong</groupId><artifactId>spark-test</artifactId><version>1.0-SNAPSHOT</version><properties><spark.version>2.2.0</spark.version><scala.version>2.11.8</scala.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><!-- <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.8.2.0</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version><exclusions><exclusion><artifactId>scala-library</artifactId><groupId>org.scala-lang</groupId></exclusion></exclusions></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.14</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.0</version></dependency><!-- <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency>--></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix><mainClass></mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><executions><execution><id>copy</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin></plugins></build></project>
Sparkstreaming 2.1.1版本
package com.zsh.spark.streamingimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentobject KafkaSparkStreaming2 {def main(args: Array[String]) {val conf =new SparkConf().setAppName("data").setMaster("local[2]")val ssc = new StreamingContext(conf, Seconds(2))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.2.112:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "sss","auto.offset.reset" -> "earliest",// "enable.auto.commit" -> (false: java.lang.Boolean) 是否自动提交offset"enable.auto.commit" -> (true: java.lang.Boolean),"auto.commit.interval.ms" -> (60*1000+"")//每隔60s自动提交一次)val topics = Array("test","weixin_for_check")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.foreachRDD(rdd => {if (rdd.count() >= 1) {rdd.map(record => (record.key,record.offset(),record.value,record.value,record.value)).foreach(println)}})// stream.map(record => (record.key, record.value))ssc.start()ssc.awaitTermination()}}
pom文件
<dependency> <groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>2.1.1</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><!-- <version>1.6.2</version> --><version>2.1.1</version>
</dependency>