Kafka应用Demo:指派分区订阅消息消费

环境准备

 Kafka环境搭建和生产者样例代码与《Kafka应用Demo:按主题订阅消费消息》相同。

消费者代码样例

public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);private static final String NEO_TOPIC = "elon-topic";Properties properties = new Properties();private KafkaConsumer consumer = null;public KafkaConsumerService() {TopicPartition partition0 = new TopicPartition(NEO_TOPIC, 0);TopicPartition partition1 = new TopicPartition(NEO_TOPIC, 1);properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Brokerproperties.put("group.id", "neo2");              // 指定消费组群 IDproperties.put("max.poll.records", "1");properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);List<TopicPartition> partitionList = new ArrayList<>();partitionList.add(partition1);partitionList.add(partition0);consumer.assign(partitionList);new Thread(this::receiveMessage).start();}public void receiveMessage() {try {while (true) {synchronized (this) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",record.topic(), record.partition(), record.offset(), record.key(), record.value());LOGGER.info("Received:" + info);Thread.sleep(10000);}consumer.commitSync();}}} catch (Exception e){} finally {consumer.close();}}
}

 样例代码中的consumer.assign(partitionList)绑定了主题下的0号分区和1号分区接收消息。指派分区的方式和按主题订阅的方式不能混用,也就是说一个消费者实例只能选择一种方式订阅。

分析

 如果我们同时启动两个conumer实例,指派订阅相同主题和相同分区的消息。可以看到这两个实例收到了相同的消息,哪怕这两个消费者配置了相同的分组,这一点是与按主题订阅消息不同的。

在这里插入图片描述

 根据官方指导文档的说法,如果使用assign绑定分区订阅消息,不同的消费者实例是相互独立的(编者注:相当于广播消息)。为了避免offset提交导致冲突,应该为不同消费者实例配置不同的分组。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/9199.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【DFT】高 K/金属栅极阈值电压偏移的密度泛函模型

文章《Density functional model of threshold voltage shifts at High-K/Metal gates》&#xff0c;是由R. Cao、Z. Zhang、Y. Guo、J. Robertson等人撰写&#xff0c;发表在《Solid-State Electronics》期刊上。通过密度泛函理论&#xff08;Density Functional Theory, DFT&…

Redis(无中心化集群搭建)

文章目录 1.无中心化集群1.基本介绍2.集群说明 2.基本环境搭建1.部署规划&#xff08;6台服务器&#xff09;2.首先删除上次的rdb和aof文件&#xff08;对之前的三台服务器都操作&#xff09;1.首先分别登录命令行&#xff0c;关闭redis2.清除/root/下的rdb和aof文件3.把上次的…

大数据技术架构

一、hadoop 1、基础知识 1.1、概念 ①Hadoop集群特点&#xff1a;高可靠性、高效性、高可拓展性、高容错性、成本低、运行在Linux操作系统上、支持多种编程语言 ②Hadoop的由来&#xff1a; 谷歌的三驾马车对应的开源软件描述GFS&#xff1a;海量数据怎么存HDFS分布式文件…

Android系统 系统音量设置和修改

Android系统拥有多种声音属性&#xff0c;包括有多媒体、通话、通知等声音属性&#xff0c;这些声音属性分别含有自己默认音量、最大音量和最小音量属性。 博主是在Android10源码上修改的&#xff0c;其他版本可以自行搜索文件或者grep “关键字”&#xff0c;参考修改。 一&a…

电脑windows系统压缩解压软件-Bandizip

一、软件功能 Bandizip是一款功能强大的压缩和解压缩软件&#xff0c;具有快速拖放、高速压缩、多核心支持以及广泛的文件格式支持等特点。 Bandizip软件的功能主要包括&#xff1a; 1. 支持多种文件格式 Bandizip可以处理多种压缩文件格式&#xff0c;包括ZIP, 7Z, RAR, A…

MySQL性能优化之参数配置

使用请根据自己服务器配置进行配置 [mysqld] #端口号 port 13306 server-id 1 #log-bin日志路径 log-binD:\Mysql-binlog\mysql-bin binlog-formatROW#设置日志保留天数 expire_logs_days7 #设置日志文件最大大小 max_binlog_size100M# innodb缓冲池大小 innodb_buffer_pool…

oracle 数据库找到UDUMP的文件名称

oracle 数据库找到UDUMP的文件名称 select p.value||\||i.instance_name||_ora_||spid||.trc as "trace_file_name" from v$parameter p ,v$process pro, v$session s, (select sid from v$mystat where rownum1) m, v$instance i where lower(p.name)user_dump_…

orbslam2基础

目录 一、 内容概要二、 orbslam2基础介绍三 、 orbslam2安装3.1 安装依赖3.2 安装orbslam23.3 下载Kitee数据集 四、 进行ORBSLAM2仿真五、 心得体会六、 参考链接 一、 内容概要 orbslam2基础介绍orbslam2安装orbslam2使用案例&#xff1a;orbslam2kitti数据集序列图像 二、…

maxpooling2d的C++细节实现

最大池化是一种常见的操作&#xff0c;用于减小输入特征图的大小并提取最显著的特征。PyTorch提供了torch.nn.functional.max_pool2d 函数来执行这个操作&#xff0c;如果不具备pytorch环境&#xff0c;可以通过C实现这个操作&#xff0c;更清楚地了解其原理&#xff1b; PyTo…

转发_重定向

1.Servlet/JSP单独使用的弊端 当我们用Servlet或者JSP单独处理请求的时候 Servlet&#xff1a;拼接大量的html字符串 造成可读性差、难以维护JSP&#xff1a;使得html和Java代码互相交织 也造成了可读性差、难以维护的后果 最合适的做法就是两者结合使用 2.ServletJSP处理请…

OpenCV4.9如何将失焦图片去模糊滤镜(67)

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇:OpenCV4.9的基于距离变换和分水岭算法的图像分割(66) 下一篇 :OpenCV4.9去运动模糊滤镜(68) 目标 在本教程中&#xff0c;您将学习&#xff1a; 什么是退化图像模型失焦图像的 PSF 是多少如何恢复…

【Linux调试器】:gdb的使用(常见指令)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux调试器gdb的使用&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通…

消除模型盲区,让透明件身后模型无所遁形

使用SOLIDWORKS设计产品出工程图&#xff0c;当模型中存在透明零部件时&#xff0c;由于位置摆放可能会遮挡其他零件。 这会影响零件在工程图中展示装配关系。 通常我们会采用剖视图或改变视图方向来展示被遮挡的零件。 SOLIDWORKS 2023版本发布了工程图中透视透明的零部件功能…

BUG:conda: command not found解决方法

文章目录 报错信息解决方法 报错信息 conda: command not found解决方法 直接输入 source ~/.bashrc看看这时输入conda有没有显示。如果没有的话,输入 vim ~/.bashrc 输入i进入编辑模式。之后 export PATH$PATH:[你自己conda的安装目录]输入vim的保存命令&#xff1a; ES…

分布式锁讲解

概括 分布式锁是一种用于在分布式系统中实现同步机制的锁。在单机系统中&#xff0c;我们可以使用如Java中的synchronized关键字或者 ReentrantLock来实现线程间的同步&#xff0c;但在分布式系统中&#xff0c;由于多个节点&#xff08;服务器&#xff09;之间的并发操作&am…

hbase建表预分区的2种方法

以下案例建表并设置预分区,分别测试以下2种方法 1.固定散列 示例:rowkey以日期为前缀 create ‘test’,‘cf1’, SPLITS > [‘202401’, ‘202402’, ‘202403’] put ‘test’,‘20240101’,‘cf1:name’,‘20240101’ put ‘test’,‘20240102’,‘cf1:name’,‘2024010…

Meta-SR: A Magnification-Arbitrary Network for Super-Resolution

CVPR2019https://github.com/XuecaiHu/Meta-SR-Pytorch 问题引入 首个解决任意尺度超分问题的模型&#xff0c;借鉴了meta-learning的思想&#xff1b;weight prediction strategy(meta-learning)&#xff1a;神经网络的权重是由另一个神经网络预测的&#xff0c;而不是通过从…

计算机中GPU快不行的几个标志,看下有没有你遇到的

GPU是处理图形密集型任务的主要组件。尽管它非常耐用,但它最终会磨损并开始失效。在到达生命的终结之前,它通常会显示出即将发生故障的迹象,需要及时修复或更换。本指南详细介绍了这些标志。 在我们开始之前 在深入研究GPU故障的迹象之前,重要的是要承认,下面提到的一些…

Cad图纸加密软件哪个最好用?成都企业都在用的透明加密软件是什么?

企业数据泄露事情频繁发生&#xff0c;为企业带来了不可计算机的经济损失&#xff0c;以及巨大的经营风险。在DT时代的到来&#xff0c;每一家企业的市场竞争本质上是知识产权的竞争&#xff0c;对于制造类企业来讲知识产权无疑是企业的cad图纸&#xff0c;制造类企业cad图纸的…

Java中常用类String的不可变性详解

Java中常用类String的不可变性详解 在Java编程中&#xff0c;String类是一个非常重要的基础类&#xff0c;它用于表示和操作字符串序列。然而&#xff0c;String类的一个核心特性是其不可变性&#xff08;immutable&#xff09;。这个特性在Java编程中有着重要的影响&#xff…