kafka系列三:生产与消费实践之旅

在本篇技术博客中,我们将深入探索Apache Kafka 0.10.0.2版本中的消息生产与消费机制。Kafka作为一个分布式消息队列系统,以其高效的吞吐量、低延迟和高可扩展性,在大数据处理和实时数据流处理领域扮演着至关重要的角色。了解如何在这一特定版本中实现消息的高效传输和处理,对于构建健壮的数据管道至关重要。

一、Kafka基础回顾

在深入生产与消费之前,让我们快速回顾一下Kafka的核心概念和架构。Kafka由Brokers、Topics、Partitions、Producers和Consumers组成。每个Broker是一个独立的服务器,负责存储和转发消息;Topic是消息的分类,每个Topic可以分为多个Partitions以实现水平扩展;Producer负责向特定的Topic发送消息;Consumer则从Topic中拉取消息进行处理。

二、Kafka 0.10.0.2生产者详解

2.1 生产者配置与初始化

在Kafka 0.10.0.2版本中,生产者配置变得更为灵活。生产者需要配置bootstrap.servers来指定Kafka集群的地址,acks来控制消息确认策略,以及其他如retriesbatch.sizelinger.ms等参数来优化性能和可靠性。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// 配置生产者Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址props.put("acks", "all"); // 所有副本必须确认接收到消息props.put("retries", 0); // 重试次数props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建KafkaProducer实例Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息for(int i = 0; i < 100; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "message-" + i);producer.send(record);}// 关闭生产者producer.close();}
}

2.2 消息发送与事务支持

Kafka 0.10.0.2引入了对幂等性和事务的支持,这是生产者端的重大改进。幂等性确保了多次发送相同消息至同一Partition时,只会有一次被写入,这对于网络重试场景特别有用。而事务则允许跨多个Partition或Topic的操作具备原子性,这对于需要严格顺序和一致性的场景至关重要。

producer.initTransactions();
try {producer.beginTransaction();producer.send(new ProducerRecord<>("my-topic", "key", "value"));producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 处理异常
} finally {producer.close();
}

三、Kafka 0.10.0.2消费者详解

3.1 新一代消费者API

Kafka 0.10.0.2版本中,消费者API经历了重大重构,引入了新的Consumer API,它摒弃了旧API对ZooKeeper的依赖,转而直接与Kafka Brokers通信,提高了容错性和性能。

3.2 消费者配置与组管理

配置消费者时,需指定group.id来定义消费者所属的消费者组,enable.auto.commit控制自动提交偏移量,以及auto.offset.reset来决定当没有初始偏移量或偏移量无效时如何处理。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// 配置消费者Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址props.put("group.id", "test-group"); // 消费者组IDprops.put("enable.auto.commit", "true"); // 开启自动提交偏移量props.put("auto.commit.interval.ms", "1000"); // 自动提交间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建KafkaConsumer实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Arrays.asList("my-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 拉取消息for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

3.3 消息消费与偏移量管理

消费者通过poll()方法拉取消息,需关注max.poll.records配置来限制每次调用返回的最大记录数。Kafka支持手动和自动两种偏移量提交模式,手动模式给予开发者更多的控制权,自动模式则简化了使用。​​​​​​​

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());consumer.commitAsync();
}

四、性能优化与最佳实践

4.1 生产者优化

  • 批处理:通过调整batch.sizelinger.ms参数,可以提高消息发送的效率。

  • 压缩:启用消息压缩(如gzip、snappy)可以减少网络传输开销,但需权衡压缩和解压缩的CPU成本。

4.2 消费者优化

  • 合理的分区分配:确保消费者组内的消费者数量与Topic的分区数相匹配,避免资源浪费或负载不均。

  • 偏移量管理:根据业务需求选择合适的偏移量提交策略,确保消息不丢失也不重复消费。

五、总结

在Kafka 0.10.0.2版本中,生产者和消费者的增强功能不仅提高了消息处理的可靠性和效率,也为开发者提供了更多灵活性和控制权。通过深入理解生产消费机制,结合合理的配置和最佳实践,可以构建出高效稳定的数据传输管道。尽管随着时间推移,Kafka有了更先进的版本,但0.10.0.2版本仍被广泛应用于遗留系统和特定场景中,其核心概念和机制的学习对于理解和掌握Kafka的演进路径具有重要意义。

图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/9268.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件设计师笔记(一)-基础要点

本文内容来自笔者学习zst 留下的笔记&#xff0c;虽然有点乱&#xff0c;但是哥已经排版过一次&#xff0c;将就着看吧&#xff0c;查缺补漏&#xff0c;希望大家都能通过&#xff0c;记得加上免费的关注&#xff01;谢谢&#xff01;csdn贴图真的很废人&#xff01; 目录 一、…

【3dmax笔记】030:参考与冻结

一、参考 参考物体,需要是实体。例如将一个图片作为参考,导入软件中,基于图片进行二维样条线绘制。 首先绘制一个三维的平面,或者绘制一个二维的矩形,添加一个挤出修改器(将厚度设为0),勾选【生成贴图坐标】,如下图所示: 然后将图片(位于配套实验数据包中的data03…

数据治理的难题:如何化解?

在数字化转型的大潮中&#xff0c;数据治理成了每个企业都绕不开的话题。但是&#xff0c;数据治理这条路并不好走&#xff0c;充满了各种挑战。这些挑战不仅来自于技术&#xff0c;还有组织文化、流程和法律法规等方面。 挑战一&#xff1a;数据孤岛 在企业内部&#xff0c;…

容灾演练双月报|郑大一附院数据级容灾演练切换

了解更多灾备行业动态 守护数字化时代业务连续 目录 CONTENTS 01 灾备法规政策 02 热点安全事件 03 容灾演练典型案例 01 灾备法规政策 3月19日&#xff0c;工信部发布《工业和信息化部办公厅关于做好2024年信息通信业安全生产和网络运行安全工作的通知》。明确提出“…

如何防止WordPress网站内容被抓取

最近在检查网站服务器的访问日志的时候&#xff0c;发现了大量来自同一个IP地址的的请求&#xff0c;用站长工具分析确认了我的网站内容确实是被他人的网站抓取了&#xff0c;我第一时间联系了对方网站的服务器提供商投诉了该网站&#xff0c;要求对方停止侵权行为&#xff0c;…

五一超级课堂---Llama3-Tutorial(Llama 3 超级课堂)---第一节 Llama 3 本地 Web Demo 部署

课程文档&#xff1a; https://github.com/SmartFlowAI/Llama3-Tutorial 课程视频&#xff1a; https://space.bilibili.com/3546636263360696/channel/collectiondetail?sid2892740&spm_id_from333.788.0.0 操作平台&#xff1a; https://studio.intern-ai.org.cn/consol…

特征提取与深度神经网络(角点检测)

图像特征概述 图像特征表示是该图像唯一的表述&#xff0c;是图像的DNA HOG HOG &#xff08;Histogram of Oriented Gradients&#xff09;是一种用于目标检测的特征描述子。在行人检测中用的最多。HOG特征描述了图像中局部区域的梯度方向信息&#xff0c;通过计算图像中各个…

通义千问2.5正式发布,能力升级,全面赶超GPT4

简介 在人工智能的大潮中&#xff0c;大模型的竞争愈发激烈。今日&#xff0c;阿里云发布了其最新的通义千问2.5大模型&#xff0c;引起了业界的广泛关注。这款模型不仅在性能上全面赶超了GPT-4&#xff0c;还在多个基准测评中取得了优异的成绩&#xff0c;展现了国产AI技术的…

ARP命令

按照缺省设置&#xff0c;ARP高速缓存中的项目是动态的&#xff0c;每当发送以恶个指定的数据报且高速缓存中不存在当前项目时&#xff0c;ARP便会自动添加该项目。一旦高速缓存的项目被输入&#xff0c;就已经开始走向失效状态。因此&#xff0c;如果ARP高速缓存中的项目很少或…

SPSS之主成分分析

SPSS中主成分分析功能在【分析】--【降维】--【因子分析】中完成&#xff08;在SPSS软件中&#xff0c;主成分分析与因子分析均在【因子分析】模块中完成&#xff09;。 求解主成分通常从分析原始变量的协方差矩阵或相关矩阵着手。 &#xff08;1&#xff09;当变量取值的度量…

【Elasticsearch<五>末篇 ✈️✈️】结合 kibana 实现索引中 IP 地址分布地图可视化

目录 &#x1f44b;前言 &#x1f440;一、ES 地理位置基本了解 &#x1f331;二、IP 地址地图可视化 2.1 创建预处理通道 2.2 创建索引库 2.3 插入一条数据 2.4 观察写入后的数据 2.5 可视化展示 &#x1f604;三、章末 &#x1f44b;前言 继前面了解 Elasticsearch 的安…

酷企秀场景elementUi plus可视化diy

无论网络公司还是政务企业需求的所需的一单可回本的 独立部署集三大功能&#xff1a;电子画册、VR全景、地图秀等功能都可以可视化在线设计 后续免费增加 自定义表单、抽奖活动功能。 源码交付&#xff0c;独立私有化部署&#xff0c;无限多开&#xff0c;可视化设计&#x…

【linux】主分区,扩展分区,逻辑分区,动态分区,引导分区,标准分区

目录 主分区&#xff0c;扩展分区&#xff0c;逻辑分区 主分区和引导分区 主分区&#xff0c;扩展分区&#xff0c;逻辑分区&#xff08;标准分区&#xff09; 硬盘一般划分为一个“主分区”和“扩展分区”&#xff0c;然后在扩展分区上再分成数个逻辑分区。 磁盘主分区扩展…

JavaWeb之过滤器(Filter)与监听器(Listener)

前言 过滤器(Filter) 1.什么是过滤器 2.过滤器的语法格式 3.使用场景 3.1.如何防止用户未登录就执行后续操作 3.2.设置编码方式--统一设置编码 3.3.加密解密(密码的加密和解密) 3.4.非法文字筛选 3.5.下载资源的限制 监听器(Listener) 1.什么是监听器 2.监听器分类…

Ci24R1 (SOP8)2.4GHz无线收发一体、双向系统的智能家居芯片

Ci24R1 &#xff08;SOP8&#xff09;工作范围在2.4GHzISM频段&#xff0c;专为低系统应用成本的无线场合设计&#xff0c;集成嵌入式ARQ基带协议引擎的无线收发器芯片。它的工作频率范围为2400MHz-2525MHz&#xff0c;共有126个1MHz带宽的信道。 Ci24R1 &#xff08;SOP8&…

IPFoxy Tips:什么是静态住宅IP?静态ISP代理指南

静态住宅代理&#xff08;也称为静态ISP代理&#xff09;是最流行的代理类型之一。它们也是隐藏您的身份并保持在线匿名的最佳方法之一。您为什么要使用住宅代理而不是仅使用常规代理服务&#xff1f;下面我具体分享。 一、什么是静态住宅代理&#xff1f; 首先&#xff0c;我…

无监督式学习

1.是什么&#xff1f; 无监督式学习与监督式学习**最大的区别就是&#xff1a;**没有事先给定的训练实例&#xff0c;它是自动对输入的示例进行分类或者分群&#xff1b; 优点&#xff1a;不需要标签数据&#xff0c;极大程度上扩大了我们的数据样本&#xff0c;其次不受监督信…

STC8增强型单片机开发day02

逻辑分析仪 什么是逻辑分析仪 逻辑分析仪&#xff08;Logic Analyzer&#xff09;是一种工具&#xff0c;用于分析数字信号&#xff0c;例如控制信号&#xff0c;时钟信号等等。它可以用于调试和验证数字电路、嵌入式系统等等 本人采用的是mini版USB 逻辑分析仪。总共有10个…

刷题《面试经典150题》(第九天)

加油&#xff01; 学习目标&#xff1a;学习内容&#xff1a;学习时间&#xff1a;知识点学习内容&#xff1a;跳跃游戏 II - 力扣&#xff08;LeetCode&#xff09;H 指数 - 力扣&#xff08;LeetCode&#xff09;盛最多水的容器 - 力扣&#xff08;LeetCode&#xff09;矩阵置…

Spring学习笔记

目录 1. Spring有什么优势 1.1 模块化 1.2 轻量级 1.3 方便集成各种优秀框架 1.4 提供了分层开发下的完整技术解决方案 1.5 Java语言编写的开源框架&#xff0c;使用了多种设计模式 2. Spring的第一个程序 2.1 开发环境 2.2 环境搭建 2.3 编码测试 2.4 BeanFactory的UML类图…