解析KafkaConsumer类的神奇之道

欢迎来到我的博客,代码的世界里,每一行都是一个故事


在这里插入图片描述

解析KafkaConsumer类的神奇之道

    • 前言
    • KafkaConsumer双线程设计
      • 主线程(消费线程):
      • 心跳线程:
      • 示例代码:
    • KafkaConsumer线程不安全
      • 线程安全的替代方案:
    • 常用方法

前言

在分布式系统的舞台上,KafkaConsumer类如同消息消费的魔法师,默默地引导着消息的流向。本文将带您进入这个分布式的消费艺术之旅,解析KafkaConsumer类的玄妙之道。让我们一起揭开这个神秘面纱,探索Kafka中KafkaConsumer类的奥秘。

KafkaConsumer双线程设计

对于 Kafka 消费者 (KafkaConsumer) 的双线程设计,一种常见的模式是使用两个线程:主线程和心跳线程。这种设计可以有效提高消费者的稳定性和性能。

主线程(消费线程):

  1. 消费消息: 主线程负责从 Kafka 主题中拉取消息,并进行业务逻辑的处理。

  2. 异步提交位移: 在消费者成功处理消息后,主线程可以异步提交位移(offset)到 Kafka。这可以通过设置 enable.auto.commitfalse,手动控制位移提交的时机,确保消息处理成功后再提交位移。

    consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
  3. 处理业务逻辑: 在主线程中,处理从 Kafka 拉取到的消息,执行具体的业务逻辑。

心跳线程:

  1. 定期发送心跳: 心跳线程负责定期向 Kafka 集群发送心跳请求,以确保消费者仍然处于活动状态。这有助于防止消费者因长时间不活动而被认为失效。

  2. 处理分区再分配: 在消费者组发生分区再分配时,心跳线程可以处理重新分配操作,确保消费者组的协调和平稳进行。

示例代码:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerWithHeartbeat {public static void main(String[] args) {Properties consumerProperties = new Properties();consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your_bootstrap_servers");consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);consumer.subscribe(Collections.singletonList("your_topic"));// 创建并启动心跳线程HeartbeatThread heartbeatThread = new HeartbeatThread(consumer);heartbeatThread.start();try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 处理消费记录的逻辑// 异步提交位移consumer.commitAsync();}} finally {// 在主线程关闭时停止心跳线程heartbeatThread.shutdown();consumer.close();}}
}class HeartbeatThread extends Thread {private final Consumer<String, String> consumer;private volatile boolean running = true;public HeartbeatThread(Consumer<String, String> consumer) {this.consumer = consumer;}@Overridepublic void run() {while (running) {// 发送心跳请求consumer.poll(Duration.ofMillis(100));}}public void shutdown() {running = false;interrupt();}
}

在上述示例中,KafkaConsumer 在主线程中进行消息的消费和位移提交,而 HeartbeatThread 负责定期发送心跳请求。注意在程序结束时关闭 HeartbeatThread,以确保线程正确停止。这种设计有助于确保消费者组的稳定和及时的位移提交。

KafkaConsumer线程不安全

KafkaConsumer 是线程不安全的,这意味着在多线程环境下,单个 KafkaConsumer 实例不能同时被多个线程使用,除非进行额外的同步措施。

在 Kafka 中,通常的做法是为每个消费者线程创建一个独立的 KafkaConsumer 实例。这确保了线程之间的独立性,避免了竞争条件和状态混乱。

线程安全的替代方案:

  1. 多个独立的 KafkaConsumer 实例: 为每个消费者线程创建一个独立的 KafkaConsumer 实例。这确保了每个线程有自己的消费状态和位移信息,不会相互干扰。

    KafkaConsumer<String, String> consumerThread1 = new KafkaConsumer<>(consumerProperties);
    KafkaConsumer<String, String> consumerThread2 = new KafkaConsumer<>(consumerProperties);
    
  2. 线程池中的消费者: 如果你使用线程池来管理消费者线程,确保每个线程都有独立的 KafkaConsumer 实例。

    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 5; i++) {executorService.submit(() -> {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);// 消费逻辑consumer.close();});
    }
    
  3. 消费者工厂创建实例: 自定义消费者工厂,确保每个工厂创建的消费者实例都是独立的。

    class ConsumerFactory {public static KafkaConsumer<String, String> createConsumer() {return new KafkaConsumer<>(consumerProperties);}
    }
    

    在每个线程中使用 ConsumerFactory.createConsumer() 来获取独立的消费者实例。

总体来说,确保每个消费者线程都有自己的 KafkaConsumer 实例是一种良好的实践,可以避免潜在的线程安全问题。同时,在使用多线程消费时,也要注意处理好位移提交和异常处理,以确保系统的稳定性和一致性。

常用方法

KafkaConsumer 是 Kafka 客户端库中用于消费消息的重要类。以下是一些 KafkaConsumer 中常用的一些重要方法:

  1. subscribe(Collection<String> topics) 订阅一个或多个主题,以开始接收消息。可以通过多次调用 subscribe 来订阅多个主题。

    consumer.subscribe(Arrays.asList("topic1", "topic2"));
    
  2. poll(Duration timeout) 从订阅的主题中拉取消息。该方法会阻塞一段时间或直到拉取到消息,参数 timeout 控制阻塞的最大时长。

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
  3. assign(Collection<TopicPartition> partitions) 手动分配特定的分区给消费者。与 subscribe 不可一起使用,需要手动管理分区的消费。

    consumer.assign(Arrays.asList(new TopicPartition("topic1", 0)));
    
  4. commitSync()commitAsync() 用于手动提交消费者的位移信息。commitSync() 是同步提交,会阻塞直到提交成功或发生错误;commitAsync() 是异步提交,不会阻塞主线程。

    consumer.commitSync();
    // 或
    consumer.commitAsync();
    
  5. seek(TopicPartition partition, long offset) 将消费者定位到特定分区和位移位置。可以在消费者启动后使用该方法。

    consumer.seek(new TopicPartition("topic1", 0), 10);
    
  6. seekToBeginning(Collection<TopicPartition> partitions)seekToEnd(Collection<TopicPartition> partitions) 将消费者定位到分区的开头或末尾。

    consumer.seekToBeginning(Collections.singletonList(new TopicPartition("topic1", 0)));
    // 或
    consumer.seekToEnd(Collections.singletonList(new TopicPartition("topic1", 0)));
    
  7. assignment() 获取当前分配给消费者的分区列表。

    Set<TopicPartition> partitions = consumer.assignment();
    
  8. unsubscribe() 取消订阅,停止消费者消费消息。

    consumer.unsubscribe();
    
  9. close() 关闭消费者,释放资源。在不使用消费者时应调用此方法。

    consumer.close();
    
  10. wakeup():可以在其他线程中安全地调用kafkaConsumer.wakeup()来唤醒Consumer,是线程安全的

这些是 KafkaConsumer 中的一些关键方法,用于管理消费者的订阅、消息拉取、位移提交等操作。根据实际使用场景,适当选择和组合这些方法可以满足不同的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/748775.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jetson nano——编译一些包的网址导航,pyside2,qt(持续更新)

目录 1.PySide2下载地址2.tesserocr下载地址3.Qt下载地址4.OpenSSL官网5.latex编译器下载地址5.1MikTex5.2TeX Live 1.PySide2下载地址 https://download.qt.io/official_releases/QtForPython/pyside2/ 如下图&#xff1a; 2.tesserocr下载地址 https://github.com/simonflue…

PTA冰岛人

作者 陈越 单位 浙江大学 2018年世界杯&#xff0c;冰岛队因1:1平了强大的阿根廷队而一战成名。好事者发现冰岛人的名字后面似乎都有个“松”&#xff08;son&#xff09;&#xff0c;于是有网友科普如下&#xff1a; 冰岛人沿用的是维京人古老的父系姓制&#xff0c;孩子的姓…

行业突破!四信实现低延时摄像头弱网状态100ms以内实时传输

随着人工智能、大数据、区块链等技术在城市中快速发展&#xff0c;人们日常生活中已经离不开网络的支撑&#xff0c;而实现“人与人”、“人与物”及“物与物”之间高速连接应用的“时延”&#xff0c;是网络支撑中最重要的存在。 以城市生活例子为例&#xff0c;当网络延时出现…

通过日志恢复sql server数据库

在SQL Server中&#xff0c;通过日志恢复数据库是一个精细的过程&#xff0c;主要用于在数据库出现错误、数据丢失或需要回滚到特定时间点时恢复数据。以下是一般步骤概述&#xff1a; 设置恢复模式&#xff1a; 首先&#xff0c;数据库必须配置为“完整恢复模式”或“大容量…

【Miniconda】Linux系统中 .condarc 配置文件的位置一般在哪里

【Miniconda】Linux系统中 .condarc 配置文件的位置一般在哪里 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到…

3. ElasticSearch搜索技术深入与聚合查询实战

1. ES分词器详解 1.1 基本概念 分词器官方称之为文本分析器&#xff0c;顾名思义&#xff0c;是对文本进行分析处理的一种手段&#xff0c;基本处理逻辑为按照预先制定的分词规则&#xff0c;把原始文档分割成若干更小粒度的词项&#xff0c;粒度大小取决于分词器规则。 1.2 …

疑问彻底搞懂TIME_WAIT状态为什么等待2MSL的时长

文章目录 1. TIME_WAIT状态2. 等待2MSL原因 1. TIME_WAIT状态 TIME_WAIT状态是TCP连接关闭过程中的一个状态&#xff0c;它表示连接已经被关闭&#xff0c;但是仍然在等待一段时间以确保远端接收到连接关闭的确认消息。在这个状态下&#xff0c;连接不再传输数据&#xff0c;但…

单片机FLASH深度解析和编程实践(上)

本篇文章主要针对单片机FLASH编程和FLASH基本原理进行学习分享。以STM32单片机作为实例进行编程实训。 关于FLASH操作的相关寄存器及编程&#xff0c;大家可以参考下一篇文章: 单片机FLASH深度解析和编程实践&#xff08;下&#xff09;-CSDN博客 目录 一、STM32编程方式 二、…

Ansys Lumerical | 激光雷达天线仿真

附件下载 联系工作人员获取附件 在本文中&#xff0c;我们将了解如何根据激光雷达应用需求设计和优化相控阵光栅天线。 概述 激光雷达&#xff08;LIDAR&#xff09;是“light detection and ranging”的简称&#xff0c;近年来由于在机器人、自动驾驶汽车、高精度测绘等领域…

万物互联的价值

随着我们习惯了万物互联&#xff0c;我们将需要改变我们的行为和使用互联网的方式。这并不像看起来那么困难。毕竟&#xff0c;自 20 世纪 90 年代中期互联网普及以来&#xff0c;你们中的许多人都会经历过各种经历的变化。你们中的许多人已经看到了网络邮件、在线电影和音乐、…

挑战杯 机器视觉的试卷批改系统 - opencv python 视觉识别

文章目录 0 简介1 项目背景2 项目目的3 系统设计3.1 目标对象3.2 系统架构3.3 软件设计方案 4 图像预处理4.1 灰度二值化4.2 形态学处理4.3 算式提取4.4 倾斜校正4.5 字符分割 5 字符识别5.1 支持向量机原理5.2 基于SVM的字符识别5.3 SVM算法实现 6 算法测试7 系统实现8 最后 0…

基于comsol七芯光纤超模模拟分析

本期教程主要向大家介绍一期采用comsol有限元分析软件进行七芯光纤模拟分析的模拟教程。首先介绍一下基本知识点 七芯光纤超模理论&#xff08;Supermode Theory for Seven-Core Fibers&#xff09;涉及一种特殊类型的多芯光纤&#xff08;MCF&#xff09;技术。在这里&#x…

【消息队列开发】 测试MessageFileManager(对硬盘中的消息操作)类

文章目录 &#x1f343;前言&#x1f384;测试流程&#x1f334;准备工作&#x1f332;测试创建队列功能&#x1f333;测试统计文件的读写&#x1f38b;测试将相应消息放入文件中&#x1f38d;测试读文件里的消息到内存&#x1f340;测试删除消息&#x1f60e;测试垃圾回收⭕总…

MySQL行锁核心知识介绍

MySQL的行锁是数据库中用于控制并发访问的一种机制。它允许在数据库的行级别上实现锁定&#xff0c;从而允许多个事务同时修改不同行的数据&#xff0c;而不会相互干扰。这种锁机制可以提高数据库的并发性能&#xff0c;减少锁争用&#xff0c;提高事务的吞吐量。在本教程中&am…

电视盒子解析安装包失败,安卓4.4安装不了kodi的解决方法,如何安装kodi

有些安卓电视或者电视盒子的安卓系统版本太低、自身架构或者屏蔽了安装其他应用的功能&#xff0c;下载的Kodi apk安装包提示无法安装&#xff0c;解析程序包时出现问题、解析出错无法安装、[INSTALL_FAILED_OLDER_SDK]、此应用与您的电视不兼容。 解决方法&#xff1a; 1、3…

OFDM调制解调过程

OFDM&#xff1a; Orthogonal Frequency Division Multiplexing 正交频分复用 1、OFDMA处理流程图 2、QPSK星座图和映射关系 QPSK&#xff08;Quadrature Phase Shift Keying&#xff0c;正交相移键控&#xff09;四相相移调制是利用载波的四种不同相位差来表征输入的…

服务器数据恢复—服务器硬盘灯显示红色的数据恢复案例

服务器数据恢复环境&故障&#xff1a; 一台服务器中有一组由多块硬盘组建的raid阵列&#xff0c;在运行过程中服务器突然崩溃&#xff0c;管理员检查服务器发现该服务器raid阵列中有两块硬盘的指示灯显示红色。于是&#xff0c;管理员重启服务器&#xff0c;服务器重启后&a…

大规模自动化重构框架--OpenRewrite浅析

目录 1. OpenRewrite是什么&#xff1f;定位&#xff1f; 2. OpenWrite具体如何做&#xff1f; 3. 核心概念释义 3.1 Lossless Semantic Trees (LST) 无损语义树 3.2 访问器&#xff08;Visitors&#xff09; 3.3 配方&#xff08;Recipes&#xff09; 4. 参考链接 Open…

PHP爬虫技术:利用simple_html_dom库分析汽车之家电动车参数

摘要/导言 本文旨在介绍如何利用PHP中的simple_html_dom库结合爬虫代理IP技术来高效采集和分析汽车之家网站的电动车参数。通过实际示例和详细说明&#xff0c;读者将了解如何实现数据分析和爬虫技术的结合应用&#xff0c;从而更好地理解和应用相关技术。 背景/引言 随着电…

IO流(3)-文件字符输入\输出流

FIleReader(文件字符输入流&#xff09; 文件字符输出流代码示例 package com.zz.io;import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.Reader;public class Test4 {public static void main(String args[]) …