从Kafka系统中读取消息数据——消费

从Kafka系统中读取消息数据——消费

  • 消费 Kafka 集群中的主题消息
    • 检查消费者是不是单线程
    • 主题如何自动获取分区和手动分配分区
      • subscribe实现订阅(自动获取分区)
      • assign(手动分配分区)
    • 反序列化主题消息
      • 反序列化一个类.
      • 演示 Kafka 自定义反序列化代码
        • (1)编写一个反序列化工具类;
        • (2)编写自定义反序列化逻辑代码;
        • (3)编写一个消费者应用程序;
        • (4)执行整个自定义反序列化代码,输出结果如图5-19 所示。
    • 如何提交消息的偏移量
    • 使用多线程消费多个分区的主题
    • 配置消费者的属性

转自《 Kafka并不难学!入门、进阶、商业实战》

消费者是读取kafka分区中信息的一个实例

注意:
一个消费者可以读取多个分区
一个分区不能被多个消费者读取

消费 Kafka 集群中的主题消息

检查消费者是不是单线程

Kafka 系统的消费者接口是向下兼容的,即,在新版 Kafka 系统中老版的消费者接口仍可以使用。在新版本的 Kafka 系统中,消费者程序代码被重构了–通过 Java 语言对消费者KafkaConsumer 类进行了重新编码。
KafkaConsumer 是非多线程并发安全的:如果多个线程公用一个 KafkaConsumer 实例,则抛出异常错误信息。KafkaConsumer 类中判断是否为单线程的内容见代码 5-2。

/**设置轻量级所来阻止多线程并发访问 */
private void acquire(){// 检测当前消费者对象是否关闭ensureNotClosed();//获取当前消费者程序线程 IDlong threadId = Thread.currentThread().getId();// 判断是否主为单线程if (threadld != currentThread.get()&& !currentThread.compareAndSet(NO CURRENT THREAD, threadId))throw new ConcurrentModificationException("KafkaConsumer is not safe formulti-threaded access");refcount.incrementAndGet();	
}

KafkaConsumer 类通过 acquire()函数来监控访问的请求是否存在并发多线程操作。如果存在,则抛出一个 ConcurrentModificationException 异常

主题如何自动获取分区和手动分配分区

阅读 KafkaConsumer 类的实现代码可以发现,该类实现了org.apache.kafka.clients.consumer.Consumer 接口。该接口提供了用户访问 Kafka 集群主题的应用接口,主要包含以下两种。

  • subscribe:订阅指定的主题列表,来获取自动分配的分区;
  • assign:手动向主题分配分区列表,指定需要“消费”的分区。
  1. 自动获取分区
    如果调用 subscribeO函数订阅主题,则消费者组中的消费者程序会被动态分配到分区,同时被指定一个 org.apache.kafka.clients.consumer.ConsumerRebalanceListener 接口。当用户分配给消费者程序的分区集合发生变化时,可以通过回调函数的接口来触发自定义操作。
    使用 subscribe0函数订阅主题时,有三个重载函数可供选择。
    (1)subscribe(Collectiontopics):指定订阅主题集合:
    (2)subscribe(Collection topics, ConsumerRebalanceListener callback):分区发生变化时,通过回调函数来进行自动分区操作;
    (3)subscribe(Pattern pattern, ConsumerRebalanceListener callback):使用正则表达式来订阅主题,当主题或者主题分区发生变化时,通过回调函数来进行自动分区操作。
  2. 手动分配分区
    手动分配分区的方式可以通过调用 assignO函数来实现。assignO)函数与 subscribeO)函数的底层实现逻辑类似,也是先做一系列的检查工作,比如,是否含有并发操作、请求的参数是否合法(分区是否为空)等。

subscribe实现订阅(自动获取分区)

/*** 实现一个消费者实例代码.* * @author smartloli.**         Created by May 6, 2018*/
public class JConsumerSubscribe extends Thread {public static void main(String[] args) {JConsumerSubscribe jconsumer = new JConsumerSubscribe();jconsumer.start();}/** 初始化Kafka集群信息. */private Properties configure() {Properties props = new Properties();props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址props.put("group.id", "ke");// 指定消费者组props.put("enable.auto.commit", "true");// 开启自动提交props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔// 反序列化消息主键props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化消费记录props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}/** 实现一个单线程消费者. */@Overridepublic void run() {// 创建一个消费者实例对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure());// 订阅消费主题集合consumer.subscribe(Arrays.asList("ip_login_rt"));// 实时消费标识boolean flag = true;while (flag) {// 获取主题消息数据ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)// 循环打印消息记录System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 出现异常关闭消费者对象
//		consumer.commitAsync();
//		consumer.commitSync();consumer.close();}
}

assign(手动分配分区)

/*** 实现一个手动分配分区的消费者实例.* * @author smartloli.**         Created by May 6, 2018*/
public class JConsumerAssign extends Thread {public static void main(String[] args) {JConsumerAssign jconsumer = new JConsumerAssign();jconsumer.start();}/** 初始化Kafka集群信息. */private Properties configure() {Properties props = new Properties();props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址props.put("group.id", "ke");// 指定消费者组props.put("enable.auto.commit", "true");// 开启自动提交props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔// 反序列化消息主键props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化消费记录props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}/** 实现一个单线程消费者程序. */@Overridepublic void run() {// 创建一个消费者实例对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure());// 设置自定义分区TopicPartition tp = new TopicPartition("test_kafka_game_x", 0);// 手动分配consumer.assign(Collections.singleton(tp));// 实时消费标识boolean flag = true;while (flag) {// 获取主题消息数据ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)// 循环打印消息记录System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 出现异常关闭消费者对象consumer.close();}
}

反序列化主题消息

在分布式环境下,有序列化和反序列化两个概念

  • 序列化:将对象转换为字节序列,然后在网络上传输或者存储在文件中;
  • 反序列化:将网络或者文件中读取的字节序列数据恢复成对象。
  1. 为什么需要实现反序列
    在传统企业应用中,不同的组件分布在不同的系统和网络中,通过序列化协议实现对象的传输,保证了两个组件之间的通信安全。经过序列化后的消息数据会转换成二进制。
    如果需要将这些二进制进行业务逻辑处理,则需要将这些二进制数据进行反序列化,将其还原成对象。
  2. 反序列一个对象
    为了反序列化一个对象,用户必须保证序列化对象和反序列化对象一致下面以 Java 语言为例,实现一个反序列化类。

反序列化一个类.

/*** 反序列化一个类.* * @author smartloli.**         Created by May 6, 2018*/
public class JObjectDeserialize {/** 创建一个日志对象实例. */private static Logger LOG = LoggerFactory.getLogger(JObjectSerial.class);/** 实例化入口函数. */@SuppressWarnings("resource")public static void main(String[] args) {try {FileInputStream fis = new FileInputStream("/tmp/salary.out"); // 实例化一个输入流对象JObjectSerial jos = (JObjectSerial) new ObjectInputStream(fis).readObject();// 反序列化还原对象LOG.info("ID : " + jos.id + " , Money : " + jos.money);// 打印反序列化还原后的对象属性} catch (Exception e) {LOG.error("Deserial has error, msg is " + e.getMessage());// 打印异常信息}}}

演示 Kafka 自定义反序列化代码

Kafka 系统中提供了反序列化的接口,以方便用户调用。用户可以通过自定义反序列化的
方式来还原对象。
下面通过实例演示 Kafka 白定义反序列化具体操作。

(1)编写一个反序列化工具类;

在这里插入图片描述

/*** 封装一个序列化的工具类.* * @author smartloli.**         Created by Apr 30, 2018*/
public class SerializeUtils {/** 实现序列化. */public static byte[] serialize(Object object) {try {return object.toString().getBytes("UTF8");// 返回字节数组} catch (Exception e) {e.printStackTrace(); // 抛出异常信息}return null;}/** 实现反序列化. */public static <T> Object deserialize(byte[] bytes) {try {return new String(bytes, "UTF8");// 反序列化} catch (Exception e) {e.printStackTrace();}return null;}}
(2)编写自定义反序列化逻辑代码;
/*** 实现自定义反序列化.* * @author smartloli.**         Created by May 6, 2018*/
public class JSalaryDeserializer implements Deserializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}/** 自定义反序列逻辑. */@Overridepublic Object deserialize(String topic, byte[] data) {return SerializeUtils.deserialize(data);}@Overridepublic void close() {}}
(3)编写一个消费者应用程序;
/*** 实现一个消费者实例代码.* * @author smartloli.**         Created by May 6, 2018*/
public class JConsumerDeserialize extends Thread {/** 自定义序列化消费者实例入口. */public static void main(String[] args) {JConsumerDeserialize jconsumer = new JConsumerDeserialize();jconsumer.start();}/** 初始化Kafka集群信息. */private Properties configure() {Properties props = new Properties();props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址props.put("group.id", "ke");// 指定消费者组props.put("enable.auto.commit", "true");// 开启自动提交props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔// 反序列化消息主键props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化消费记录props.put("value.deserializer", "org.smartloli.kafka.game.x.book_5.deserialize.JSalaryDeserializer");return props;}/** 实现一个单线程消费者. */@Overridepublic void run() {// 创建一个消费者实例对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure());// 订阅消费主题集合consumer.subscribe(Arrays.asList("test_topic_ser_des"));// 实时消费标识boolean flag = true;while (flag) {// 获取主题消息数据ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)// 循环打印消息记录System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 出现异常关闭消费者对象consumer.close();}}
(4)执行整个自定义反序列化代码,输出结果如图5-19 所示。

在这里插入图片描述

如何提交消息的偏移量

Kafka 0.10.0x 版本之前的消费者程序会将“消费”的偏移量(Ofsets)提交到 Zookeeper系统的/consumers 目录。
例如,消费者组名为 test topic gl,主题名为 test topic,分区数为1,那么运行老版本消费者程序后,在 Zookeeper 系统中,偏移量提交的路径是/consumers/test topic_gl/ofisets/test topic/0Zookeeper 系统并不适合频繁地进行读写操作,因为 Zookeeper 系统性能降低会严重影响Kafka 集群的吞吐量。所以,在 Kafka 新版本消费者程序中,对偏移量的提交进行了重构,将其保存到 Kafka 系统内部主题中,消费者程序产生的偏移量会持续追加到该内部主题的分区中。Kafka系统提供了两种方式来提交偏移量,它们分别是自动提交和手动提交。

  1. 自动提交
    使用 KafkaConsumer 自动提交偏移量时,需要在配置属性中将“enable.auto.commit”设置为 true,另外可以设置“auto.commit.interval.ms”属性来控制自动提交的时间间隔。Kafka 系统自动提交偏移量的底层实现调用了 ConsumerCoordinator 的 commitOffsetsSync()函数来进行同步提交,或者 commitOfsetsAsync()函数来进行异步提交。自动提交的流程如图5-20 所示。
    在这里插入图片描述
  2. 手动提交
    在编写消费者程序代码时,将配置属性“enable.auto.commit”的值设为“false”,则可以通过手动模式来提交偏移量。
    KafkaConsumer消费者程序类提供了两种手动提交偏移量的方式–同步提交commitSync()函数和异步提交 commitAsync()函数
    阅读这两种提交方式的源代码可以发现,它们的底层分别由消费者协调器ConsumerCoordinator 的同步提交偏移量 commitOfsetsSync()函数和异步提交偏移量commitOffsetsAsync()函数来实现。
    消费者应用程序通过 ConsumerCoordinator 来发送 OfsetCommitRequest 请求,Kafka 服务器端接收到请求后,由组协调器 GroupCoordinator 进行处理,然后将偏移量信息追加到 Kafka系统内部主题中。

使用多线程消费多个分区的主题

在分布式应用场景中,Kafka 系统为了保证集群的可扩展性,对主题添加了多分区的概念而在实际消费者程序中,随着主题数据量的增加,可能一个消费者程序难以满足要求。下面通过实例来演示多线程消费多分区主题。

/*** 多线程消费者实例.* * @author smartloli.**         Created by May 6, 2018*/
public class JConsumerMutil {// 创建一个日志对象private final static Logger LOG = LoggerFactory.getLogger(JConsumerMutil.class);private final KafkaConsumer<String, String> consumer; // 声明一个消费者实例private ExecutorService executorService; // 声明一个线程池接口public JConsumerMutil() {Properties props = new Properties();props.put("bootstrap.servers", "dn1:9095,dn2:9094,dn3:9092");// 指定Kafka集群地址props.put("group.id", "ke");// 指定消费者组props.put("enable.auto.commit", "true");// 开启自动提交props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化消息主键props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化消费记录consumer = new KafkaConsumer<String, String>(props);// 实例化消费者对象consumer.subscribe(Arrays.asList("kv3_topic"));// 订阅消费者主题}/** 执行多线程消费者实例. */public void execute() {// 初始化线程池executorService = Executors.newFixedThreadPool(6);while (true) {// 拉取Kafka主题消息数据ConsumerRecords<String, String> records = consumer.poll(100);if (null != records) {executorService.submit(new KafkaConsumerThread(records, consumer));}}}/** 关闭消费者实例对象和线程池 */public void shutdown() {try {if (consumer != null) {consumer.close();}if (executorService != null) {executorService.shutdown();}if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {LOG.error("Shutdown kafka consumer thread timeout.");}} catch (InterruptedException ignored) {Thread.currentThread().interrupt();}}/** 消费者线程实例. */class KafkaConsumerThread implements Runnable {private ConsumerRecords<String, String> records;public KafkaConsumerThread(ConsumerRecords<String, String> records, KafkaConsumer<String, String> consumer) {this.records = records;}@Overridepublic void run() {for (TopicPartition partition : records.partitions()) {// 获取消费记录数据集List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);LOG.info("Thread id : "+Thread.currentThread().getId());// 打印消费记录for (ConsumerRecord<String, String> record : partitionRecords) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}}/** 多线程消费者实例入口. */public static void main(String[] args) {JConsumerMutil consumer = new JConsumerMutil();try {consumer.execute();} catch (Exception e) {LOG.error("Mutil consumer from kafka has error,msg is " + e.getMessage());consumer.shutdown();}}
}

在这里插入图片描述

配置消费者的属性

新版 Kanka 系统引入了新的消费者属性。在使用 Java 语言编写消费者应用程序时,可以按需添加一些属性来控制消息数据的读取。具体属性见表 5-2。
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/669660.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件测试学习笔记-使用jmeter进行性能测试

性能测试&#xff1a;使用自动化工具&#xff0c;模拟不同的场景&#xff0c;对软件各项性能指标进行测试和评估的过程。 性能测试的目的&#xff1a; 评估当前系统的能力寻找性能瓶颈&#xff0c;优化性能评估软件是否能够满足未来的需要 性能测试和功能测试对比 焦点不同&…

基于FPGA的图像最近邻插值算法verilog实现,包括tb测试文件和MATLAB辅助验证

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 将FPGA数据导入matlab显示图片&#xff0c;效果如下&#xff1a; 2.算法运行软件版本 vivado2019.2&#xff0c;matlab2022a 3.部分核心程序 ti…

【开源】SpringBoot框架开发高校学生管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 学生管理模块2.2 学院课程模块2.3 学生选课模块2.4 成绩管理模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 学生表3.2.2 学院课程表3.2.3 学生选课表3.2.4 学生成绩表 四、系统展示五、核心代码5.1 查询课程5.2 新…

服务器和CDN推荐

简介 陆云Roovps是一家成立于2021年的主机服务商&#xff0c;主要业务是销售美国服务器、香港服务器及国外湖北十堰高防服务器&#xff0c;还有相关CDN产品。&#xff08; 地址&#xff1a;roovps&#xff09; 一、相关产品

C语言之数据在内存中的存储

目录 1. 整数在内存中的存储2. 大小端字节序和字节序判断什么是大小端&#xff1f;为什么有大小端&#xff1f;练习1练习2练习3练习4练习5练习6 3. 浮点数在内存中的存储浮点数存的过程浮点数取得过程练习题解析 1. 整数在内存中的存储 在讲解操作符的时候&#xff0c;我们已经…

ffmpeg的使用,安装,抽帧,加水印,截图,生成gif,格式转换,抓屏等

实际使用中总结的关于ffmpeg对视频的处理的记录文档 具体信息&#xff1a; http://ffmpeg.org/download.html 官网下载ffmpeg 关于ffmpeg的安装详细步骤和说明 装ffmpeg 方式,Linux和windows下的 http://bbs.csdn.net/topics/390519382 php 调用ffmpeg , http://bbs.csdn.net/t…

(篇九)MySQL常用内置函数

目录 ⌛数学函数 ⌛字符串函数 ⌛聚合函数 ⌛日期函数 &#x1f4d0;获取当前时间 &#x1f4d0;获取时间的某些内容 &#x1f4d0;​编辑 &#x1f4d0;格式化函数 &#x1f4cf;format类型&#xff1a; ⌛系统信息函数 ⌛类型转换函数 数学函数 字符串函数 聚合函…

SSH口令问题

SSH&#xff08;Secure Shell&#xff09;是目前较可靠、专为远程登录会话和其他网络服务提供 安全性的协议&#xff0c;主要用于给远程登录会话数据进行加密&#xff0c;保证数据传输的安全。 SSH口令长度太短或者复杂度不够&#xff0c;如仅包含数字或仅包含字母等时&#xf…

html5 audio video

DOMException: play() failed because the user didn‘t interact with the document first.-CSDN博客 不可用&#xff1a; 可用&#xff1a; Google Chrome Close AutoUpdate-CSDN博客

[C++] 如何使用Visual Studio 2022 + QT6创建桌面应用

安装Visual Studio 2022和C环境 [Visual Studio] 基础教程 - Window10下如何安装VS 2022社区版_visual studio 2022 社区版-CSDN博客 安装QT6开源版 下载开源版本QT Try Qt | 开发应用程序和嵌入式系统 | Qt Open Source Development | Open Source License | Qt 下载完成&…

请问半吊子 C++选手该如何深入学习 C++?

请问半吊子 C选手该如何深入学习 C? 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「C的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff0…

React 实现表单组件

表单是html的基础元素&#xff0c;接下来我会用React实现一个表单组件。支持包括输入状态管理&#xff0c;表单验证&#xff0c;错误信息展示&#xff0c;表单提交&#xff0c;动态表单元素等功能。 数据状态 表单元素的输入状态管理&#xff0c;可以基于react state 实现。 …

【证书管理】实验报告

证书管理实验 【实验环境】 ISES客户端 【实验步骤】 查看证书 查看证书详细信息 选择任意证书状态&#xff0c;在下方“证书列表”中出现符合要求的所有证书。在“证书列表”中点击要查看证书&#xff0c;在右侧“证书详细信息”栏出现被选证书信息。 上述操作如图1.2.…

Elasticsearch:基本 CRUD 操作 - Python

在我之前的文章 “Elasticsearch&#xff1a;关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”&#xff0c;我详细讲述了如何建立 Elasticsearch 的客户端连接。我们也详述了如何对数据的写入及一些基本操作。在今天的文章中&#xff0c;我们针对数据的 CRUD (cre…

C++后端开发之Sylar学习三:VSCode连接Ubuntu配置Gitee

C后端开发之Sylar学习三&#xff1a;VSCode连接Ubuntu配置Gitee 为了记录学习的过程&#xff0c;学习Sylar时写的代码统一提交到Gitee仓库中。 Ubuntu配置Gitee 安装git sudo apt-get install -y git配置用户名和邮箱 git config --global user.name 用户名 …

ArcGISPro中Python相关命令总结

主要总结conda方面的相关命令 列出当前活动环境中的包 conda list 列出所有 conda 环境 conda env list 克隆环境 克隆以默认的 arcgispro-py3 环境为模版的 my_env 新环境。 conda create --clone arcgispro-py3 --name my_env --pinned 激活环境 activate my_env p…

相机图像质量研究(3)图像质量测试介绍

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

RabbitMQ-1.介绍与安装

介绍与安装 1.RabbitMQ1.0.技术选型1.1.安装1.2.收发消息1.2.1.交换机1.2.2.队列1.2.3.绑定关系1.2.4.发送消息 1.2.数据隔离1.2.1.用户管理1.2.3.virtual host 1.RabbitMQ 1.0.技术选型 消息Broker&#xff0c;目前常见的实现方案就是消息队列&#xff08;MessageQueue&…

Linux操作系统下安装消息中间件RabbitMQ_00000

下载 在官网下载Linux版RabbitMQ安装文件。 erlang-21.3-1.el7.x86_64.rpm rabbitmq-server-3.8.8-1.el7.noarch.rpm 安装 1、将文件上传至Linux系统中。 上传到/usr/local/software目录下&#xff08;如果没有software目录&#xff0c;则创建。&#xff09;。 2、安装文件&…

操作系统-【预备学习-2】(Linux 文件操作命令)

文章目录 相关知识文件查看命令cat 命令head 命令tail 命令nl 命令文件编辑基本命令 演示 相关知识 文件查看命令 我们要查看一些文本文件的内容时&#xff0c;要使用文本编辑器来查看。在Linxu下&#xff0c;可以使用一些命令预览文本文件中的内容&#xff0c;而不必使用文本…