kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

目录

  • 1- 单播模式,只有一个消费者组
  • 2- 广播模式,多个消费者组
  • 3- Java实践

kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一般情况下partition-0存储a,c,e3个数据,partition-1存储b,d,f另外3个数据。

1- 单播模式,只有一个消费者组

topic只有1个partition,该组内有多个消费者时,此时同一个partition内的消息只能被该组中的一个consumer消费。当消费者数量多于partition数量时,多余的消费者是处于空闲状态的,如图1所示。topic,test只有一个partition,并且只有1个group,G1,该group内有多个consumer,只能被其中一个消费者消费,其他的处于空闲状态。

在这里插入图片描述
该topic有多个partition,该组内有多个消费者,比如test 有3个partition,该组内有2个消费者,那么可能就是C0对应消费p0,p1内的数据,c1对应消费p2的数据;如果有3个消费者,就是一个消费者对应消费一个partition内的数据了。图解分别如图2,图3.这种模式在集群模式下使用是非常普遍的,比如我们可以起3个服务,对应的topic设置3个partiition,这样就可以实现并行消费,大大提高处理消息的效率。

在这里插入图片描述
在这里插入图片描述

2- 广播模式,多个消费者组

如果想实现广播的模式就需要设置多个消费者组,这样当一个消费者组消费完这个消息后,丝毫不影响其他组内的消费者进行消费,这就是广播的概念。

多个消费者组,1个partition;
该topic内的数据被多个消费者组同时消费,当某个消费者组有多个消费者时也只能被一个消费者消费.

在这里插入图片描述

多个消费者组,多个partition

该topic内的数据可被多个消费者组多次消费,在一个消费者组内,每个消费者又可对应该topic内的一个或者多个partition并行消费,如图

在这里插入图片描述

3- Java实践

这里使用Java服务进行实践,模拟2个parition,然后同一个组内有2个消费者的情况:

首先创建一个发送消息的controller方法:

@ApiOperation(value = "向具有kafka-2个partition的topic发送信息")@RequestMapping(value = "/testSendMessage2", method = RequestMethod.POST)public String testSendMessage(@RequestParam("msg") String msg) {KafkaTemplate.send(KafkaTopicEnum.TEST_TWO_PARTITION_MSG.code,msg);System.out.println("发送的消息是:"+msg);return "2个partition的topic数据!--ok";}

然后再创建一个监听类监听该topic,这里的监听类即为消费者。

/*** @date 2020-09-24* 两个partition的topic,同一个组的两个消费者就可以并行的消费了,需要kafka也是集群才行,单机版并不支持* @param consumerRecord* @param acknowledgment*/@KafkaListener(topics = "two-partition-msg",groupId ="serverGroup1",containerFactory = "ackContainerFactory")public void receiveKafkaTwoParMsg(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){InetAddress address = null;try {address = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}System.out.println("当前的IP地址是:"+address.getHostAddress());System.out.println("监听服务A-收到的消息是::");System.out.println(consumerRecord.value().toString());System.out.println("=================== end =================");
//        ack 提交掉,避免服务重启再次拉取到消息acknowledgment.acknowledge();}

然后我们给该服务起2个实例,即模拟该组内serverGroup1内的2个消费者,然后我们使用测试方法进行测试,向该topic内发送多个消息,观察2个实例的输出日志:

实例1:

发送的消息是:111
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“111=================== end =================
发送的消息是:222
发送的消息是:333
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“333=================== end =================
发送的消息是:444
发送的消息是:555
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“555=================== end =================
发送的消息是:666
发送的消息是:777
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“777=================== end =================
发送的消息是:888
发送的消息是:999
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“999-----------------------------------

实例2:

当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“222=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“444=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“666=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“888”
发现该组内的一个消费者消费到了111,333,555,777,999 ,另外一个消费者消费到了222,444,666,888,起到了均衡消费的效果。

所以在微服务的集群中,我们可以通过给topic设置多个partition,然后让每一个实例对应消费1个partition的数据,从而实现并行的处理数据,可以显著地提高处理消息的速度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/226587.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git忽略已经提交的文件

原理类似于 Android修改submodule的lib包名

一文搞懂OSI参考模型与TCP/IP

OSI参考模型与TCP/IP 1. OSI参考模型1.1 概念1.2 数据传输过程 2. TCP/IP2.1 概念2.2 数据传输过程 3. 对应关系4. 例子4.1 发送数据包4.2 传输数据包4.3 接收数据包 1. OSI参考模型 1.1 概念 OSI模型&#xff08;Open System Interconnection Reference Model&#xff09;&a…

MySQL,分组order by

一、创建分组 ## 创建分组 -- 返回每个发布会的参会人数 SELECT event_id,COUNT(*) as canjia_num FROM sign_guest GROUP BY event_id; 1、group by子句可以包含任意个列&#xff0c;但是但指定的所有列都是一起计算的。 group by 后2个字段一起计算的 2、group by后面可以跟…

Leetcode 剑指 Offer II 057. 存在重复元素 III

题目难度: 中等 原题链接 今天继续更新 Leetcode 的剑指 Offer&#xff08;专项突击版&#xff09;系列, 大家在公众号 算法精选 里回复 剑指offer2 就能看到该系列当前连载的所有文章了, 记得关注哦~ 题目描述 给你一个整数数组 nums 和两个整数 k 和 t 。请你判断是否存在 两…

STM32-HAL库11-SPI通讯(F103C6T6做主机,F103C8T6做从机)

STM32-HAL库11-SPI通讯&#xff08;F103C6T6做主机&#xff0c;F103C8T6做从机&#xff09; 一、所用材料 STM32F103C6T6最小系统板-主机 STM32F103C8T6最小系统板-从机 串口调试助手X-COM 二、所学内容 主要为实现SPI的轮询发送功能&#xff0c;在DSP280049C初学&#xff…

Logistic Regression——逻辑回归

1. 为什么需要逻辑回归 在前面学习的线性回归中&#xff0c;我们的预测值都是任意的连续值&#xff0c;例如预测房价。除此之外&#xff0c;还有一个常见的问题就是分类问题&#xff0c;而逻辑回归是一个解决分类问题的模型&#xff0c;其预测值是离散的。 分类问题又包括…

如何安装LUT预设?达芬奇/FCP/PR怎么安装LUT预设.cube格式文件的教程

在下载的LUT调色预设压缩文件包中&#xff0c;通常两个包含不同格式的LUT文件&#xff1a; .cube 和 .xmp 包含的 .cube 文件几乎与主流的视频编辑和色彩校正软件兼容&#xff0c;并且还可以在 Adobe Photoshop 等一些照片应用程序中使用。如果主要是将这些 LUT 用于视频剪辑项…

如何搭建Gateway服务

Gateway的简单介绍 Spring Cloud Gateway是Spring Cloud的一个项目&#xff0c;该项目是基于Spring&#xff0c;Spring Boot和Project Reactor等响应式编程和事件流技术开发的网关&#xff0c;它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。Gateway网关可以是…

持续集成交付CICD:Jenkins使用GitLab共享库实现基于SaltStack的CD流水线部署前后端应用

目录 一、实验 1.Jenkins使用GitLab共享库实现基于SaltStack的CD流水线部署前后端应用 2.优化共享库代码 二、问题 1.Jenkins手动构建后端项目流水线报错 一、实验 1.Jenkins使用GitLab共享库实现基于SaltStack的CD流水线部署前后端应用 &#xff08;1&#xff09;GitLa…

使用Redis构建简单的社交网站

文章目录 第1关&#xff1a;创建用户与动态第2关&#xff1a;处理用户关系第3关&#xff1a;状态与信息流 第1关&#xff1a;创建用户与动态 编程要求 在Begin-End区域编写 create_user(login_name, real_name) 函数&#xff0c;实现创建新用户的功能&#xff0c;具体参数与要…

【javascript】npm ERR! cb() never called!

错误 环境 windows 10 nvm node 14.17.0 如何解决 尝试了 5 种方法 1&#xff0c;npm cache clean --force 2, npm cache verify 3, 删掉package-lock.json &#xff08;然鹅我的这个项目没有这个文件&#xff09; 4, npm set strict-ssl false 5, 删除node_modules 这五种…

Excel中MATCH和INDEX函数的用法详解,以及Vlookup的数组用法

match函数 目的&#xff1a;查询函数&#xff0c;范围单元格中搜索特定的项&#xff0c;然后返回该项在此区域中的相对位置。 For example:让 match 去【隔壁办公室】找【老张】 Match 回复&#xff1a;【老张】坐在【隔壁办公室】第【四】个座位上 公式&#xff1a;【 mat…

【FPGA/verilog -入门学习7】 条件判断if与分支判断case语句的语法介绍

需求 使用if 和case 产生格雷码 / /*条件判断if与分支判断case语句的语法介绍 需求 使用if 和case 产生格雷码*/ / timescale 1ns/1ps module vlg_design(input [3:0] i_data, output reg [3:0] o_data,output reg [3:0] o_datac);always (*) begin if (4b0000 i_data) o_d…

MySQL数据库 入门

目录 一、MySQL概述 二、MySQL安装 安装数据库 配置数据库 启动停止数据库 客户端连接数据库 三、数据模型 四、SQL 一、MySQL概述 先来讲解三个概念&#xff1a;数据库、数据库管理系统、 SQL 。 而目前主流的关系型数据库管理系统的市场占有率排名如下&#xff1a; …

【从零开始学习--设计模式--代理模式】

返回首页 前言 感谢各位同学的关注与支持&#xff0c;我会一直更新此专题&#xff0c;竭尽所能整理出更为详细的内容分享给大家&#xff0c;但碍于时间及精力有限&#xff0c;代码分享较少&#xff0c;后续会把所有代码示例整理到github&#xff0c;敬请期待。 此章节介绍建…

C语言算法与数据结构,旅游景区地图求最短路径

背景&#xff1a; 本次作业要求完成一个编程项目。请虚构一张旅游景区地图&#xff0c;景区地图包括景点&#xff08;结点&#xff09;和道路&#xff08;边&#xff09;&#xff1a;地图上用字母标注出一些点&#xff0c;表示景点&#xff08;比如&#xff0c;以点 A、B、C、…

vue2使用wangeditor实现数学公式+富文本编辑器

需求&#xff1a; 做一个带有数学公式的富文本编辑器&#xff0c;在网上看了很多&#xff0c;这个最合适&#xff0c;借鉴了wangEditor富文本编辑器 这里面写的是v3的整合富文本编辑器&#xff0c;我照着上面改成了v2的&#xff0c;本文章主要是实现步骤和错误解决&#xff0c;…

【数据结构】单链表的定义和操作

目录 1.单链表的定义 2.单链表的创建和初始化 3.单链表的插入节点操作 4.单链表的删除节点操作 5.单链表的查找节点操作 6.单链表的更新节点操作 7.完整代码 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很高兴与大家相识&#xff0c;希望我的博客能对你有所帮助…

2023-12-16:用go语言,给定整数数组arr,求删除任一元素后, 新数组中长度为k的子数组累加和的最大值。 来自字节。

2023-12-16&#xff1a;用go语言&#xff0c;给定整数数组arr&#xff0c;求删除任一元素后&#xff0c; 新数组中长度为k的子数组累加和的最大值。 来自字节。 答案2023-12-16&#xff1a; 来自左程云。 灵捷3.5 大体步骤如下&#xff1a; 算法 maxSum1 分析&#xff1…

网络时间服务器

本章主要介绍网络时间服务器。 使用chrony配置时间服务器 配置chrony客户端向服务器同步时间 1 时间同步的必要性 一些服务对时间要求非常严格&#xff0c;例如&#xff0c;图所示的由三台服务器搭建的ceph集群。 这三台服务器的时间必须保持一致&#xff0c;如果不一致&#…