kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。
一、自定义PartitionAssignor.

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class BroadcastAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "broadcast";}private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {Map<String, List<String>> res = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;}@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<String>> consumersPerTopic =consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();subscriptions.keySet().forEach(memberId ->assignment.put(memberId, new ArrayList<>()));consumersPerTopic.entrySet().forEach(topicEntry->{String topic = topicEntry.getKey();List<String> members = topicEntry.getValue();Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null || members.isEmpty())return;List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);if (!partitions.isEmpty()) {members.forEach(memberId ->assignment.get(memberId).addAll(partitions));}});return assignment;}
}

二、定义两个消费者,给其配置上述PartitionAssignor.

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class KafkaTest19 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup2023");properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,BroadcastAssignor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());String topic="study2023";myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println("record offset is: "+record.offset());}}}
}
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class KafkaTest20 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup2023");properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,BroadcastAssignor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());String topic="study2023";myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println("record offset is: "+record.offset());}}}
}

在kafka创建只有一个分区的topic : study2023

创建一个生产者往study2023这个 topic发送消息:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaTest01 {public static void main(String[] args) {Properties properties= new Properties();properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);ProducerRecord<String,String> producerRecord=new ProducerRecord<>("study2023",0,"fff","hello sister,now is: "+ new Date());Future<RecordMetadata> future = kafkaProducer.send(producerRecord);long offset = 0;try {offset = future.get().offset();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(offset);kafkaProducer.close();}
}

分别运行生产者和消费者,可以看到相同消费者组里两个消费者可以消费study2023这个topic的同一个分区的数据
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/57493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【BUG事务内消息发送】事务内消息发送,事务还未结束,消息发送已被消费,查无数据怎么解决?

问题描述 在一个事务内完成插入操作&#xff0c;通过MQ异步通知其他微服务进行事件处理。 由于是在事务内发送&#xff0c;其他服务消费消息&#xff0c;查询数据时还不存在如何解决呢&#xff1f; 解决方案 通过spring-tx包的TransactionSynchronizationManager事务管理器解…

LeetCode 热题 100(七):105. 从前序与中序遍历序列构造二叉树、14. 二叉树展开为链表

题目一&#xff1a; 105. 从前序与中序遍历序列构造二叉树https://leetcode.cn/problems/construct-binary-tree-from-preorder-and-inorder-traversal/ 思路&#xff1a;依据前序遍历的根左右和中序遍历的左根右&#xff0c; 且根左长度&#xff1d;左根 代码&#xff1a; …

C#搭建WebSocket服务实现通讯

在学习使用websocket之前我们先了解一下websocket&#xff1a; WebSocket是一种在单个TCP连接上进行全双工通信的通信协议。与HTTP协议不同&#xff0c;它允许服务器主动向客户端发送数据&#xff0c;而不需要客户端明确地请求。这使得WebSocket非常适合需要实时或持续通信的应…

纯 CSS 开关切换按钮

<!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>纯 CSS 开关切换按钮</title><style>html {font-size: 62.5%;}body {background-color: #1848a0;}.wrapper {position: absolute;left: …

【注册岩土】Python土力学与基础工程计算.PDF-土中的应力

Python 求解代码如下&#xff1a; 1&#xff0e;&#xff03;计算竖向有效自重应力2.h12#m3.h21.5#m4.h31#m5.gamma1 19# kN/m^36.gamma218# kN/m^37.gamma317# kN/m^38.sigma_c gammal * h1 gamma2*h2 gamma3 *h39&#xff0e;print&#xff08;&#xff02;竖向有效自重应力…

时空数据挖掘精选23篇论文解析【AAAI 2023】

今天和大家分享时空数据挖掘方向的资料。 时空数据挖掘是人工智能技术的重要分支&#xff0c;是一种采用人工智能和大数据技术对城市时空数据进行分析与挖掘的方法&#xff0c;旨在挖掘时空数据&#xff0c;理解城市本质&#xff0c;解决城市问题。 目前&#xff0c;时空数据…

uniapp 使用permission获取录音权限

使用前&#xff0c;需要先配置权限 android.permission.RECORD_AUDIO

大数据Flink(六十九):SQL 数据类型

文章目录 SQL 数据类型 一、原子数据类型 二、​​​​​​复合数据类型 SQL 数据类型 在介绍完一些基本概念之后,我们来认识一下

WordPress导航主题源码

源码说明&#xff1a; V2.0406 添加搜索自动索引百度热搜关键词 添加首页tab标签模式加载方式切换(ajax加载和普通加载)(首页设置) 修复tab标签ajax加载模式会显示未审核的网址的bug 小屏幕热搜采用水平滚动 优化子主题支持 添加文章分页 添加解决WordPress 429的服务(…

API 接口应该如何设计?如何保证安全?如何签名?如何防重?

说明&#xff1a;在实际的业务中&#xff0c;难免会跟第三方系统进行数据的交互与传递&#xff0c;那么如何保证数据在传输过程中的安全呢&#xff08;防窃取&#xff09;&#xff1f;除了https的协议之外&#xff0c;能不能加上通用的一套算法以及规范来保证传输的安全性呢&am…

Autoware.universe部署04:universe传感器ROS2驱动

文章目录 一、激光雷达驱动二、IMU驱动2.1 上位机配置4.2 IMU校准4.3 安装ROS驱动 三、CAN驱动四、相机驱动4.1 安装驱动4.2 修改相机参数 五、GNSS驱动 本文介绍了 Autoware.universe 各个传感器ROS2驱动&#xff0c;本系列其他文章&#xff1a; Autoware.universe部署01&…

配置DNS服务的正反向解析

正向解析 安装DNS服务 2.在服务器端 编辑区域配置文件&#xff0c;选择一个解析模版进行修改---------/etc/named.rfc1912.zones 修改第一第三行 编辑数据配置文件&#xff0c;使用cp -a命令完全拷贝一份正向解析模版&#xff08;named.localhost&#xff09;&#xff0c;在…

安装docker服务,配置镜像加速器

文章目录 1.安装docker服务&#xff0c;配置镜像加速器2.下载系统镜像&#xff08;Ubuntu、 centos&#xff09;3.基于下载的镜像创建两个容器 &#xff08;容器名一个为自己名字全拼&#xff0c;一个为首名字字母&#xff09;4.容器的启动、 停止及重启操作5.怎么查看正在运行…

k8s deployment创建pod流程图

参考 k8s 创建pod和deployment的流程 - SoulChild随笔记

微前沿 | 第1期:强可控视频生成;定制化样本检索器;用脑电重建视觉感知;大模型鲁棒性评测

欢迎阅读我们的新栏目——“微前沿”&#xff01; “微前沿”汇聚了微软亚洲研究院最新的创新成果与科研动态。在这里&#xff0c;你可以快速浏览研究院的亮点资讯&#xff0c;保持对前沿领域的敏锐嗅觉&#xff0c;同时也能找到先进实用的开源工具。 本期内容速览 01. 强可…

【详解】文本检测OCR模型的评价指标

关于文本检测OCR模型的评价指标 前言&#xff1a;网上关于评价标准乱七八糟的&#xff0c;有关于单词的&#xff0c;有关于段落的&#xff0c;似乎没见过谁解释一下常见论文中常用的评价指标具体是怎么计算的&#xff0c;比如DBNet&#xff0c;比如RCNN&#xff0c;这似乎好像…

操作系统真题

操作系统真题 考点前驱图真题分页存储管理索引文件结构分段存储管理进程的状态进程的同步和互斥 考点 考试只会考察选择题 前驱图真题 c 这是常考题型 b 分页存储管理 将程序分页 --逻辑地址 将内存分为页框&#xff08;物理块&#xff09; --物理地址 程序页的大小和页框的大小…

修改Jupyter Notebook默认打开路径

这里我是重新下载的anaconda&#xff0c;打开Jupyter之后是默认在C盘的一个路径的&#xff0c;现在我们就来修改一下它的一个默认打开路径&#xff0c;这样在我们后续学习过程中&#xff0c;可以将ipynb后缀的文件放在这个目录下就能查看了。 1、先打开Anaconda Prompt&#x…

常见前端面试之VUE面试题汇总十一

31. Vuex 有哪几种属性&#xff1f; 有五种&#xff0c;分别是 State、 Getter、Mutation 、Action、 Module state > 基本数据(数据源存放地) getters > 从基本数据派生出来的数据 mutations > 提交更改数据的方法&#xff0c;同步 actions > 像一个装饰器&a…

他们朝我扔泥巴(scratch)

前言 纯~~~属~~~虚~~~构~~~&#xff08;同学看完短视频要我做&#xff0c;蟹蟹你&#xff09; 用scratch做的&#xff0c;幼稚得嘞(&#xffe3;_&#xffe3;|||)呵呵&#xff08;强颜欢笑&#xff09; 完成视频 视频试了好久&#xff0c;就是传不上来&#xff0c;私信我加我…