Kafka应用Demo:指派分区订阅消息消费

环境准备

 Kafka环境搭建和生产者样例代码与《Kafka应用Demo:按主题订阅消费消息》相同。

消费者代码样例

public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);private static final String NEO_TOPIC = "elon-topic";Properties properties = new Properties();private KafkaConsumer consumer = null;public KafkaConsumerService() {TopicPartition partition0 = new TopicPartition(NEO_TOPIC, 0);TopicPartition partition1 = new TopicPartition(NEO_TOPIC, 1);properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Brokerproperties.put("group.id", "neo2");              // 指定消费组群 IDproperties.put("max.poll.records", "1");properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);List<TopicPartition> partitionList = new ArrayList<>();partitionList.add(partition1);partitionList.add(partition0);consumer.assign(partitionList);new Thread(this::receiveMessage).start();}public void receiveMessage() {try {while (true) {synchronized (this) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",record.topic(), record.partition(), record.offset(), record.key(), record.value());LOGGER.info("Received:" + info);Thread.sleep(10000);}consumer.commitSync();}}} catch (Exception e){} finally {consumer.close();}}
}

 样例代码中的consumer.assign(partitionList)绑定了主题下的0号分区和1号分区接收消息。指派分区的方式和按主题订阅的方式不能混用,也就是说一个消费者实例只能选择一种方式订阅。

分析

 如果我们同时启动两个conumer实例,指派订阅相同主题和相同分区的消息。可以看到这两个实例收到了相同的消息,哪怕这两个消费者配置了相同的分组,这一点是与按主题订阅消息不同的。

在这里插入图片描述

 根据官方指导文档的说法,如果使用assign绑定分区订阅消息,不同的消费者实例是相互独立的(编者注:相当于广播消息)。为了避免offset提交导致冲突,应该为不同消费者实例配置不同的分组。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/9199.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【DFT】高 K/金属栅极阈值电压偏移的密度泛函模型

文章《Density functional model of threshold voltage shifts at High-K/Metal gates》&#xff0c;是由R. Cao、Z. Zhang、Y. Guo、J. Robertson等人撰写&#xff0c;发表在《Solid-State Electronics》期刊上。通过密度泛函理论&#xff08;Density Functional Theory, DFT&…

Redis(无中心化集群搭建)

文章目录 1.无中心化集群1.基本介绍2.集群说明 2.基本环境搭建1.部署规划&#xff08;6台服务器&#xff09;2.首先删除上次的rdb和aof文件&#xff08;对之前的三台服务器都操作&#xff09;1.首先分别登录命令行&#xff0c;关闭redis2.清除/root/下的rdb和aof文件3.把上次的…

大数据技术架构

一、hadoop 1、基础知识 1.1、概念 ①Hadoop集群特点&#xff1a;高可靠性、高效性、高可拓展性、高容错性、成本低、运行在Linux操作系统上、支持多种编程语言 ②Hadoop的由来&#xff1a; 谷歌的三驾马车对应的开源软件描述GFS&#xff1a;海量数据怎么存HDFS分布式文件…

电脑windows系统压缩解压软件-Bandizip

一、软件功能 Bandizip是一款功能强大的压缩和解压缩软件&#xff0c;具有快速拖放、高速压缩、多核心支持以及广泛的文件格式支持等特点。 Bandizip软件的功能主要包括&#xff1a; 1. 支持多种文件格式 Bandizip可以处理多种压缩文件格式&#xff0c;包括ZIP, 7Z, RAR, A…

oracle 数据库找到UDUMP的文件名称

oracle 数据库找到UDUMP的文件名称 select p.value||\||i.instance_name||_ora_||spid||.trc as "trace_file_name" from v$parameter p ,v$process pro, v$session s, (select sid from v$mystat where rownum1) m, v$instance i where lower(p.name)user_dump_…

orbslam2基础

目录 一、 内容概要二、 orbslam2基础介绍三 、 orbslam2安装3.1 安装依赖3.2 安装orbslam23.3 下载Kitee数据集 四、 进行ORBSLAM2仿真五、 心得体会六、 参考链接 一、 内容概要 orbslam2基础介绍orbslam2安装orbslam2使用案例&#xff1a;orbslam2kitti数据集序列图像 二、…

转发_重定向

1.Servlet/JSP单独使用的弊端 当我们用Servlet或者JSP单独处理请求的时候 Servlet&#xff1a;拼接大量的html字符串 造成可读性差、难以维护JSP&#xff1a;使得html和Java代码互相交织 也造成了可读性差、难以维护的后果 最合适的做法就是两者结合使用 2.ServletJSP处理请…

OpenCV4.9如何将失焦图片去模糊滤镜(67)

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇:OpenCV4.9的基于距离变换和分水岭算法的图像分割(66) 下一篇 :OpenCV4.9去运动模糊滤镜(68) 目标 在本教程中&#xff0c;您将学习&#xff1a; 什么是退化图像模型失焦图像的 PSF 是多少如何恢复…

【Linux调试器】:gdb的使用(常见指令)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux调试器gdb的使用&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通…

消除模型盲区,让透明件身后模型无所遁形

使用SOLIDWORKS设计产品出工程图&#xff0c;当模型中存在透明零部件时&#xff0c;由于位置摆放可能会遮挡其他零件。 这会影响零件在工程图中展示装配关系。 通常我们会采用剖视图或改变视图方向来展示被遮挡的零件。 SOLIDWORKS 2023版本发布了工程图中透视透明的零部件功能…

分布式锁讲解

概括 分布式锁是一种用于在分布式系统中实现同步机制的锁。在单机系统中&#xff0c;我们可以使用如Java中的synchronized关键字或者 ReentrantLock来实现线程间的同步&#xff0c;但在分布式系统中&#xff0c;由于多个节点&#xff08;服务器&#xff09;之间的并发操作&am…

Meta-SR: A Magnification-Arbitrary Network for Super-Resolution

CVPR2019https://github.com/XuecaiHu/Meta-SR-Pytorch 问题引入 首个解决任意尺度超分问题的模型&#xff0c;借鉴了meta-learning的思想&#xff1b;weight prediction strategy(meta-learning)&#xff1a;神经网络的权重是由另一个神经网络预测的&#xff0c;而不是通过从…

计算机中GPU快不行的几个标志,看下有没有你遇到的

GPU是处理图形密集型任务的主要组件。尽管它非常耐用,但它最终会磨损并开始失效。在到达生命的终结之前,它通常会显示出即将发生故障的迹象,需要及时修复或更换。本指南详细介绍了这些标志。 在我们开始之前 在深入研究GPU故障的迹象之前,重要的是要承认,下面提到的一些…

PXE批量部署,一键安装配置多台Linux系统

目录 一、PXE批量部署的优点 二、搭建PXE远程安装服务器 1. 实验初始化设置 2. 一键安装软件包 3. 复制 vmlinuz、initrd.img、pxelinux.0文件 4. 配置PE启动菜单配置文件 5. 修改配置文件&#xff0c; 启动各个软件服务 6. kickstart自动应答文件修改启动菜单配置文件…

什么是静态住宅代理IP?

静态住宅代理&#xff08;也称为静态ISP代理&#xff09;是最流行的代理类型之一。它们也是隐藏您的身份并保持在线匿名的最佳方法之一。您为什么要使用住宅代理而不是仅使用常规代理服务&#xff1f;下面我具体分享。 一、什么是静态住宅代理&#xff1f; 首先&#xff0c;我…

【iOS】事件传递与响应机制

文章目录 前言事件UIEvent一、事件传递遍历顺序 二、手势识别三、响应机制UIResponder&#xff08;响应者&#xff09;响应者链 四、相关应用扩大button点击范围穿透事件 总结 前言 提到响应者链与事件传递&#xff0c;如果看过其他人的博客&#xff0c;经常能看到这经典的三张…

苍穹外卖Day06笔记

疯玩了一个月&#xff0c;效率好低&#xff0c;今天开始捡起来苍穹外卖~ 1. 为什么不需要单独引入HttpClient的dependency&#xff1f; 因为我们在sky-common的pom.xml中已经引入了aliyun-sdk-oss的依赖&#xff0c;而这个依赖低层就引入了httpclinet的依赖&#xff0c;根据依…

Docker部署Metabase

文章目录 Docker安装MetabaseCentOS7安装Docker获取最新的 Docker 镜像启动Metabase容器在Metabase初始化时查看日志访问Metabase Metabase 的 ClickHouse 驱动程序安装环境简介删除容器创建容器下载click house驱动放入驱动重启容器将元数据库连接到 ClickHouse报错解决 Docke…

YOLOv9改进策略 | 添加注意力篇 | 一文带你改进GAM、CBAM、CA、ECA等通道注意力机制和多头注意力机制

一、本文介绍 这篇文章给大家带来的改进机制是一个汇总篇&#xff0c;包含一些简单的注意力机制&#xff0c;本来一直不想发这些内容的&#xff08;网上教程太多了&#xff0c;发出来增加文章数量也没什么意义&#xff09;&#xff0c;但是群内的读者很多都问我这些机制所以单…

PDPS15---安装过程---常遇问题---分享

目录 问题1 安装失败 1.1 运行第一步出错 1.2 解决 问题2 路径错误 2.1 错误 2.2 解决 问题3 运行失败 3.1 无法找到路径 3.2 原因分析 3.3 解决 问题4 拒绝访问 4.1 出现提示 4.2 分析 4.3 解决 问题5 许可证过期 5.1 PD找不到许可证 5.2 解决 问题1 安装失败…