kafka直连方式消费多个topic

一个消费者组可以消费多个topic,以前写过一篇一个消费者消费一个topic的,这次的是一个消费者组通过直连方式消费多个topic,做了小测试,结果是正确的,通过查看zookeeper的客户端,zookeeper记录了偏移量

package day04

/*
消费多个topic
*/
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import scala.collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}

object OrderDemoYY1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yy").setMaster("local[*]")
val ssc = new StreamingContext(conf,Duration(5000))
//消费3个topic
val topic1 = "wc"
val topic2 ="wc1"
val topic3 ="wc2"
//组名
val groupid ="GPMMVV"
//zookeeper地址
val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
//brokerList
val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
//把消费的分区放到Set集合中,可以在第一次读取时作为参数传入
val topics = Set(topic1,topic2,topic3)
//ListBuffer时有序的,按下标有序
val topicsList = ListBuffer[String](topic1,topic2,topic3)
//设置kafka的参数
val kafkaParams = Map(
"metadata.broker.list"->brokerList,
"groupid"->groupid,
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
//默认时从头开始读的
)

//new ListBuffer用来存放ZKGroupTopicDirs, 用来保存偏移量的地址
//因为有多个topic,对应的也就有多个ZKGroupTopicDirs
var zkGTList:ListBuffer[ZKGroupTopicDirs] =new ListBuffer[ZKGroupTopicDirs]()
//根据topicList 新建 ZKGroupTopicDirs 添加到zkGTList
for(tp <- topicsList){
val topicDirs = new ZKGroupTopicDirs(groupid,tp)
zkGTList += topicDirs
}
//新建zkClient,用来获取偏移量和更新偏移量
val zkClient = new ZkClient(zkQuorum)
//新建一个InputDStream,要是var,因为有两种情况,消费过? 没有消费过? 根据情况赋值
var kafkaDStream :InputDStream[(String,String)] = null
//创建一个Map,(key,value)-》( 对应的时Topic和分区 ,偏移量)
var fromOffset = Map[TopicAndPartition,Long]()

//获取每个topic是否被消费过
var childrens:ListBuffer[Int] =new ListBuffer[Int]()
var flag = false //有topic被消费过则为true
for (topicDir <- zkGTList){ //循环存放偏移量的
//通过zkClient.countChidren来获取每个topic对应的分区中的偏移量ZKGroupTopicDirs的对象
val child: Int = zkClient.countChildren(topicDir.consumerOffsetDir)
childrens +www.mhylpt.com= child
if(child>0){
flag = true
}
}


if(flag){//消费过
for(z <- 0 until topics.size){ //根据topicsList的的下表获取相应的child和ZKGroupTopicDirs
val child = childrens(z)
val gpDirs = zkGTList(z)
val topicn = topicsList(z)
for(i <- 0 until child)www.mcyllpt.com/{
//循环child, 根据使用zkClient.readData方法,u获取topic的每个分区的偏移量
val offset = zkClient.readData[String](gpDirs.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(www.michenggw.com/ topicn,i)
fromOffset += tp -> offset.toLong
}
}
//返回的而结果是 kafka的key,默认是null, value是kafka中的值
val messageHandler =www.gcyl159.com/ (mmd:MessageAndMetadata[String,String])=www.gcyl152.com>{
(mmd.key(),mmd.message())
}
//创建kafkaDStream
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
ssc,kafkaParams,fromOffset,messageHandler
)
}else{//以前没有读取过
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topics
)
}

/*val children1 = zkClient.countChildren(zKGroupTopicDirs1.consumerOffsetDir)
val children2 = zkClient.countChildren(zKGroupTopicDirs2.consumerOffsetDir)
if(children1>0 || children2>0){
if(children1>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs1.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topic1,i)
fromOffset += tp ->offset.toLong
}
}
if(children2>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs2.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topic2,i)
fromOffset += tp ->offset.toLong
}
}
val messageHandler =(mmd:MessageAndMetadata[String,String])=>{
(mmd.key(),mmd.message())
}
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
kafkaParams,fromOffset,messageHandler)
}else{
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
}*/


var offsetRanges = Array[OffsetRange]www.hjpt521.com() //用来记录更新的每个topic的分区偏移量

kafkaDStream.foreachRDD(kafkaRDD=>{
//kafkaRDD是一个KafkaRDD,可以转换成HasOffsetRanges对象,从而获取offsetRanges
offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaRDD.foreach(println)www.365soke.com //打印

for(o <- offsetRanges){
val topicNN: String = o.topic //获取topic
val offset: Long = o.untilOffset //获取偏移量
val partition: Int = o.partition //获取分区
val i = topicsList.indexOf(topicNN) //通过topicList查找topic的下标,找到与之对应的ZKGroupTopicDirs
val gpDir = zkGTList(i)
//通过ZkUtils更新偏移量
ZkUtils.updatePersistentPath(zkClient,gpDir.consumerOffsetDir+"/"+partition,offset.toString)
/*if(topicNN.equals(topic1)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs1.consumerOffsetDir+"/"+partition,offset.toString)
}else if(topicNN.equals(topic2)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs2.consumerOffsetDir+"/"+partition,offset.toString)
}*/
}
})

ssc.start()
ssc.awaitTermination(www.dfgjyl.cn)

可以通过zookeeper的客户端,在/consumers中查看偏移量,
我的3个topic中,其中wc和wc1只有1个分区,可以通过下图可看出wc1的0分区偏移量13

转载于:https://www.cnblogs.com/qwangxiao/p/9971006.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/450682.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

100个经典的C语言算法

100个经典的C算法 C语言的学习要从基础开始&#xff0c;这里是100个经典的算法 题目&#xff1a;古典问题&#xff1a;有一对兔子&#xff0c;从出生后第3个月起每个月都生一对兔子&#xff0c;小兔 子长到第三个月后每个月又生一对兔子&#xff0c;假如兔子都不死&#xff0c;…

MySQL常见面试题目详解

文章目录1. SQL1.1 介绍一下数据库分页1.2 介绍一下SQL中的聚合函数1.3 表跟表是怎么关联的&#xff1f;1.4 说一说你对外连接的了解1.5 说一说数据库的左连接和右连接1.6 SQL中怎么将行转成列&#xff1f;1.7 谈谈你对SQL注入的理解1.8 将一张表的部分数据更新到另一张表&…

[转]windows系统激活

原文链接主题&#xff1a;使用kms激活&#xff0c;可以直接使用命令来完成。 方法&#xff1a;在win10桌面状态下&#xff0c;右击windows徽标或按快捷键windowsx&#xff0c;点击命令提示符&#xff08;管理员&#xff09; 用到的命令是slmgr&#xff0c;手动kms激活命令如下&…

jackson annotations注解详解

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 官方WIKI&#xff1a;https://github.com/FasterXML/jackson-databind/wiki jackson 1.x和2.x版本的注解是放置在不同的包下的 1.x是在…

JS-for的衍生对象

在js中一般使用方法&#xff1a; 1.常规的for(var i0;i<length;i) 2.for-in:for(var item in list) 3.for of 描述&#xff1a;对应于一个对象的每个属性&#xff0c;或一个数组的每个元素&#xff0c;执行一个或多个语句。 语法&#xff1a;for (variable in [object | ar…

浮点数在计算机中存储方式

C语言和C#语言中&#xff0c;对于浮点类型的数据采用单精度类型&#xff08;float&#xff09;和双精度类型(double)来存储&#xff0c;float数据占用32bit,double数据占用64bit,我们在声明一个变量float f 2.25f的时候&#xff0c;是如何分配内存的呢&#xff1f;如果胡乱分配…

操作系统面试题目详解

文章目录1.13 什么是协程&#xff1f;1.14 为什么协程比线程切换的开销小&#xff1f;1.15 线程和进程的区别&#xff1f;1.16 进程切换为什么比线程更消耗资源&#xff1f;1.17 介绍一下进程之间的通信。1.18 介绍一下信号量。1.19 说说僵尸进程和孤儿进程。1.20 请介绍进程之…

(项目)在线教育平台(六)

八、授课机构功能 1、模板继承 如果几个页面的大体结构相同&#xff0c;可以使用继承的方式来实现母版的重用性&#xff0c;也就是子版继承母版的内容&#xff0c;既可以使用模板的内容&#xff0c;也可以重写需要改变的地地方。 首先完成授课机构的页面&#xff0c;通过页面显…

C语言 socket 编程学习

对于SOCKET在这里我不想究其历史,我只想说其时它是一种进程通讯的方式,简言之就是调用这个网络库的一些API函数就能实现分布在不同主机的相关进程之间的数据交换. SOCKET中首先我们要理解如下几个定义概念: 一是IP地址:IP Address我想很容易理解,就是依照TCP/IP协议分配…

dependency 中的 classifier属性

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 classifier元素用来帮助定义构件输出的一些附属构件。附属构件与主构件对应&#xff0c;比如主构件是 kimi-app-2.0.0.jar 该项目可能还…

PHP超全局变量$_SERVER

$_SERVER 是一个包含了诸如头信息(header)、路径(path)、以及脚本位置(script locations)等等信息的数组。这个数组中的项目由 Web 服务器创建。不能保证每个服务器都提供全部项目&#xff1b;服务器可能会忽略一些&#xff0c;或者提供一些没有在这里列举出来的项目。 $_SERVE…

VC读写XML文件

1、安装MSXML 4.0 SP2。在VC6中建立一个基于Dialog的工程。如图&#xff1a; 在界面上放置3个编辑框、1个按钮控件。其中属性设置如下。 编辑框&#xff1a; IDCategoryVariable TypeVariable NameIDC_IDValueCStringm_strIdIDC_AUTHORValueCStringm_strAuthorIDC_TITLEValueCS…

XCode10 swift4.2 适配遇到的坑

以下是2018年10月23日更新 经过大约一个月的时间的适配&#xff0c;项目正式使用XCode10(以下简称为10 or XC10)大部分库都升级为Swift4.2&#xff08;以下简称为 4.2 or S4.2&#xff09;&#xff0c;下面是适配过程中遇到的一些坑。 1. Swift4、Swift4.2混编 如果你对项目是小…

学生管理系统Java版

简单的学生管理系统 主界面编写&#xff1a; 1.用输出语句完成主界面的编写 2.用Scanner语句实现键盘的录入 3.用swich语句完成操作的选择 4.用循环完成再次回到主界面 代码实现&#xff1a; while (true) {//1.用输出语句完成主界面的编写System.out.println("--------…

dubbo 配置文件详解

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 一、dubbo常用配置 <dubbo:service/> 服务配置&#xff0c;用于暴露一个服务&#xff0c;定义服务的元信息&#xff0c;一个服务可…

ASP.NET Core 实战:Linux 小白的 .NET Core 部署之路

一、前言 最近一段时间自己主要的学习计划还是按照毕业后设定的计划&#xff0c;自己一步步的搭建一个前后端分离的 ASP.NET Core 项目&#xff0c;目前也还在继续学习 Vue 中&#xff0c;虽然中间断了很长时间&#xff0c;好歹还是坚持下来了&#xff0c;嗯&#xff0c;看了看…

学以致用十三-----Centos7.2+python3+YouCompleteMe成功历程

历经几天的摸索&#xff0c;趟过几趟坑之后&#xff0c;终于完成YouCompleteMe的安装配置。 今天同样是个不能忘记的日子&#xff0c;国耻日&#xff0c;勿忘国耻。&#xff08;9.18&#xff09; 服务器安装好&#xff0c;基本配置配置好后&#xff0c;开始安装。 一、检查服务…

VC画图用到的主要方法

1。鼠标落下&#xff0c;记录鼠标的起始位置 void CMyEasyDrawView::OnLButtonDown(UINT nFlags, CPoint point) { // TODO: 在此添加消息处理程序代码和/或调用默认值 //graph->m_nTypedlg-> m_bStartDraw true; m_PtPress m_PtLast point; CView::OnLButtonDown…

【最新版】Java学习路线(含B站口碑推荐视频链接)

文章目录关于如何自学一、计算机网络二、数据结构与算法三、操作系统四、计算机组成原理五、编译原理六、设计模式七、MySQL八、实操工具九、JAVA并发与JVM十、Redis十一、Linux十二、Java路线学习尚硅谷黑马程序员动力节点狂神说十三、Java基础十四、JavaWeb十五、框架十六、微…

记录no static method cannot be reference

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 报错如题&#xff1a; no static method cannot be reference 我一直以为是在静态方法中调用了非静态方法&#xff0c;实际上只是我在注…