kafka系列三:生产与消费实践之旅

在本篇技术博客中,我们将深入探索Apache Kafka 0.10.0.2版本中的消息生产与消费机制。Kafka作为一个分布式消息队列系统,以其高效的吞吐量、低延迟和高可扩展性,在大数据处理和实时数据流处理领域扮演着至关重要的角色。了解如何在这一特定版本中实现消息的高效传输和处理,对于构建健壮的数据管道至关重要。

一、Kafka基础回顾

在深入生产与消费之前,让我们快速回顾一下Kafka的核心概念和架构。Kafka由Brokers、Topics、Partitions、Producers和Consumers组成。每个Broker是一个独立的服务器,负责存储和转发消息;Topic是消息的分类,每个Topic可以分为多个Partitions以实现水平扩展;Producer负责向特定的Topic发送消息;Consumer则从Topic中拉取消息进行处理。

二、Kafka 0.10.0.2生产者详解

2.1 生产者配置与初始化

在Kafka 0.10.0.2版本中,生产者配置变得更为灵活。生产者需要配置bootstrap.servers来指定Kafka集群的地址,acks来控制消息确认策略,以及其他如retriesbatch.sizelinger.ms等参数来优化性能和可靠性。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// 配置生产者Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址props.put("acks", "all"); // 所有副本必须确认接收到消息props.put("retries", 0); // 重试次数props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建KafkaProducer实例Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息for(int i = 0; i < 100; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "message-" + i);producer.send(record);}// 关闭生产者producer.close();}
}

2.2 消息发送与事务支持

Kafka 0.10.0.2引入了对幂等性和事务的支持,这是生产者端的重大改进。幂等性确保了多次发送相同消息至同一Partition时,只会有一次被写入,这对于网络重试场景特别有用。而事务则允许跨多个Partition或Topic的操作具备原子性,这对于需要严格顺序和一致性的场景至关重要。

producer.initTransactions();
try {producer.beginTransaction();producer.send(new ProducerRecord<>("my-topic", "key", "value"));producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 处理异常
} finally {producer.close();
}

三、Kafka 0.10.0.2消费者详解

3.1 新一代消费者API

Kafka 0.10.0.2版本中,消费者API经历了重大重构,引入了新的Consumer API,它摒弃了旧API对ZooKeeper的依赖,转而直接与Kafka Brokers通信,提高了容错性和性能。

3.2 消费者配置与组管理

配置消费者时,需指定group.id来定义消费者所属的消费者组,enable.auto.commit控制自动提交偏移量,以及auto.offset.reset来决定当没有初始偏移量或偏移量无效时如何处理。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// 配置消费者Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址props.put("group.id", "test-group"); // 消费者组IDprops.put("enable.auto.commit", "true"); // 开启自动提交偏移量props.put("auto.commit.interval.ms", "1000"); // 自动提交间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建KafkaConsumer实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Arrays.asList("my-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 拉取消息for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

3.3 消息消费与偏移量管理

消费者通过poll()方法拉取消息,需关注max.poll.records配置来限制每次调用返回的最大记录数。Kafka支持手动和自动两种偏移量提交模式,手动模式给予开发者更多的控制权,自动模式则简化了使用。​​​​​​​

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());consumer.commitAsync();
}

四、性能优化与最佳实践

4.1 生产者优化

  • 批处理:通过调整batch.sizelinger.ms参数,可以提高消息发送的效率。

  • 压缩:启用消息压缩(如gzip、snappy)可以减少网络传输开销,但需权衡压缩和解压缩的CPU成本。

4.2 消费者优化

  • 合理的分区分配:确保消费者组内的消费者数量与Topic的分区数相匹配,避免资源浪费或负载不均。

  • 偏移量管理:根据业务需求选择合适的偏移量提交策略,确保消息不丢失也不重复消费。

五、总结

在Kafka 0.10.0.2版本中,生产者和消费者的增强功能不仅提高了消息处理的可靠性和效率,也为开发者提供了更多灵活性和控制权。通过深入理解生产消费机制,结合合理的配置和最佳实践,可以构建出高效稳定的数据传输管道。尽管随着时间推移,Kafka有了更先进的版本,但0.10.0.2版本仍被广泛应用于遗留系统和特定场景中,其核心概念和机制的学习对于理解和掌握Kafka的演进路径具有重要意义。

图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/9268.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 高级面试问题及答案 更新(二)

Java 高级面试问题及答案 以下是几个常见的Java高级面试问题及其答案&#xff0c;供参考。 1. 什么是Java内存模型&#xff08;JMM&#xff09;&#xff1f;它如何影响并发编程&#xff1f; 问题&#xff1a;在Java中&#xff0c;内存模型&#xff08;JMM&#xff09;是一个…

共享旅游创业,千益畅行到底是不是新一轮的割韭菜?

共享旅游创业&#xff0c;千益畅行到底是不是新一轮的割韭菜&#xff1f; 共享旅游创业是近年来兴起的一种业务模式&#xff0c;通过互联网平台将旅游资源与用户进行共享和交易&#xff0c;提供更便捷、灵活的旅游体验。而千益畅行是其中的一家公司&#xff0c;专注于提供自驾…

软件设计师笔记(一)-基础要点

本文内容来自笔者学习zst 留下的笔记&#xff0c;虽然有点乱&#xff0c;但是哥已经排版过一次&#xff0c;将就着看吧&#xff0c;查缺补漏&#xff0c;希望大家都能通过&#xff0c;记得加上免费的关注&#xff01;谢谢&#xff01;csdn贴图真的很废人&#xff01; 目录 一、…

【3dmax笔记】030:参考与冻结

一、参考 参考物体,需要是实体。例如将一个图片作为参考,导入软件中,基于图片进行二维样条线绘制。 首先绘制一个三维的平面,或者绘制一个二维的矩形,添加一个挤出修改器(将厚度设为0),勾选【生成贴图坐标】,如下图所示: 然后将图片(位于配套实验数据包中的data03…

数据治理的难题:如何化解?

在数字化转型的大潮中&#xff0c;数据治理成了每个企业都绕不开的话题。但是&#xff0c;数据治理这条路并不好走&#xff0c;充满了各种挑战。这些挑战不仅来自于技术&#xff0c;还有组织文化、流程和法律法规等方面。 挑战一&#xff1a;数据孤岛 在企业内部&#xff0c;…

容灾演练双月报|郑大一附院数据级容灾演练切换

了解更多灾备行业动态 守护数字化时代业务连续 目录 CONTENTS 01 灾备法规政策 02 热点安全事件 03 容灾演练典型案例 01 灾备法规政策 3月19日&#xff0c;工信部发布《工业和信息化部办公厅关于做好2024年信息通信业安全生产和网络运行安全工作的通知》。明确提出“…

如何防止WordPress网站内容被抓取

最近在检查网站服务器的访问日志的时候&#xff0c;发现了大量来自同一个IP地址的的请求&#xff0c;用站长工具分析确认了我的网站内容确实是被他人的网站抓取了&#xff0c;我第一时间联系了对方网站的服务器提供商投诉了该网站&#xff0c;要求对方停止侵权行为&#xff0c;…

五一超级课堂---Llama3-Tutorial(Llama 3 超级课堂)---第一节 Llama 3 本地 Web Demo 部署

课程文档&#xff1a; https://github.com/SmartFlowAI/Llama3-Tutorial 课程视频&#xff1a; https://space.bilibili.com/3546636263360696/channel/collectiondetail?sid2892740&spm_id_from333.788.0.0 操作平台&#xff1a; https://studio.intern-ai.org.cn/consol…

C++ 容器(二)——容器操作

一、容器的修改 容器修改函数 insert()&#xff1a;在指定位置插入一个或多个元素erase()&#xff1a;删除指定位置或指定范围的元素push_back()&#xff1a;将元素添加到容器的末尾pop_back()&#xff1a;删除容器的最后一个元素 push_front()&#xff1a;将元素添加到容器的开…

特征提取与深度神经网络(角点检测)

图像特征概述 图像特征表示是该图像唯一的表述&#xff0c;是图像的DNA HOG HOG &#xff08;Histogram of Oriented Gradients&#xff09;是一种用于目标检测的特征描述子。在行人检测中用的最多。HOG特征描述了图像中局部区域的梯度方向信息&#xff0c;通过计算图像中各个…

PHP关联数组[区别,组成,取值,遍历,函数]

关联数组 相较于数值数组&#xff0c;关联数组的索引可以为字符串和数字&#xff0c;关联数组元素也可称为键值对&#xff0c;索引为键&#xff0c;值为值。 源码 <?php echo "<hr>"; //水平线标签//关联数组$arr3 array(); //创建空的数组//关联数…

C++并发:线程函数传参(一)

一、问题 当创建 std::thread 对象时&#xff0c;传递给线程的函数的所有参数都会被复制或移动到新创建的线程的内存空间中。这是为了确保线程的执行不会依赖于父线程可能销毁的栈上变量&#xff0c;从这个机制上看&#xff0c;这是很合理的。 在新的线程的栈上&#xff0c;这…

通义千问2.5正式发布,能力升级,全面赶超GPT4

简介 在人工智能的大潮中&#xff0c;大模型的竞争愈发激烈。今日&#xff0c;阿里云发布了其最新的通义千问2.5大模型&#xff0c;引起了业界的广泛关注。这款模型不仅在性能上全面赶超了GPT-4&#xff0c;还在多个基准测评中取得了优异的成绩&#xff0c;展现了国产AI技术的…

ARP命令

按照缺省设置&#xff0c;ARP高速缓存中的项目是动态的&#xff0c;每当发送以恶个指定的数据报且高速缓存中不存在当前项目时&#xff0c;ARP便会自动添加该项目。一旦高速缓存的项目被输入&#xff0c;就已经开始走向失效状态。因此&#xff0c;如果ARP高速缓存中的项目很少或…

SPSS之主成分分析

SPSS中主成分分析功能在【分析】--【降维】--【因子分析】中完成&#xff08;在SPSS软件中&#xff0c;主成分分析与因子分析均在【因子分析】模块中完成&#xff09;。 求解主成分通常从分析原始变量的协方差矩阵或相关矩阵着手。 &#xff08;1&#xff09;当变量取值的度量…

【Elasticsearch<五>末篇 ✈️✈️】结合 kibana 实现索引中 IP 地址分布地图可视化

目录 &#x1f44b;前言 &#x1f440;一、ES 地理位置基本了解 &#x1f331;二、IP 地址地图可视化 2.1 创建预处理通道 2.2 创建索引库 2.3 插入一条数据 2.4 观察写入后的数据 2.5 可视化展示 &#x1f604;三、章末 &#x1f44b;前言 继前面了解 Elasticsearch 的安…

酷企秀场景elementUi plus可视化diy

无论网络公司还是政务企业需求的所需的一单可回本的 独立部署集三大功能&#xff1a;电子画册、VR全景、地图秀等功能都可以可视化在线设计 后续免费增加 自定义表单、抽奖活动功能。 源码交付&#xff0c;独立私有化部署&#xff0c;无限多开&#xff0c;可视化设计&#x…

【linux】主分区,扩展分区,逻辑分区,动态分区,引导分区,标准分区

目录 主分区&#xff0c;扩展分区&#xff0c;逻辑分区 主分区和引导分区 主分区&#xff0c;扩展分区&#xff0c;逻辑分区&#xff08;标准分区&#xff09; 硬盘一般划分为一个“主分区”和“扩展分区”&#xff0c;然后在扩展分区上再分成数个逻辑分区。 磁盘主分区扩展…

Redis-2 双写一致性

双写一致性 一.什么是双写一致性&#xff1f; 在数据库中的数据修改后&#xff0c;也要修改缓存中的数据&#xff0c;保证数据库与缓存中保存的数据一致。 二.如何保证双写一致性&#xff1f; 回答该问题一定要结合自己的项目&#xff0c;说明自己的项目是必须强一致性还是…

JavaWeb之过滤器(Filter)与监听器(Listener)

前言 过滤器(Filter) 1.什么是过滤器 2.过滤器的语法格式 3.使用场景 3.1.如何防止用户未登录就执行后续操作 3.2.设置编码方式--统一设置编码 3.3.加密解密(密码的加密和解密) 3.4.非法文字筛选 3.5.下载资源的限制 监听器(Listener) 1.什么是监听器 2.监听器分类…