13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

目录

  • kafka 消费者API用法
    • 消费者API
    • 使用消费者API消费消息
    • 消费者消费消息的代码演示
      • 1、官方API示例
      • 2、创建消费者类
      • 3、演示消费结果
        • 1、演示消费者属于同一个消费者组
        • 2、演示消费者不属于同一个消费者组
        • 3、停止线程不适用
        • 4、一些参数解释
    • 代码
      • 生产者:MessageProducer
      • 消费者 Consumer01
      • 消费者 Consumer02
      • pom.xml

kafka 消费者API用法

消费者API

消费者API的核心类是 KafkaConsumer,它提供了如下常用方法:

- subscribe(Collection<String> topics):订阅主题。- subscribe(Pattern pattern):订阅符合给定正则表达式的所有主题。- subscription():返回该消费者所订阅的主题集合。- unsubscribe():取消订阅。- close():关闭消费者。- poll(Duration timeout):拉取消息。- assign(Collection<TopicPartition> partitions):手动为该消费者分配分区。- assignment():返回分配该消费者的分区集合。- commitAsync():异步提交offset。- commitSync():同步提交offset。提示:如果开启了自动提交offset,则无需调用上面commitAsync()或commitSync()方法进行手动提交;自动提交offset比较方便,但手动提交offset则更精确,消费者程序可以等到消息真正被处理后再手动提交offset。——该选项有点类似于JMS、RabbitMQ的消息消费者的,消息确认机制。- enforceRebalance():强制执行重平衡。

下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。

- seek(TopicPartition partition, long offset):跳到指定的offset处,即下一条消息从offset处开始拉取。- seekToBeginning(Collection<TopicPartition> partitions):跳到指定分区的开始处。- seekToEnd(Collection<TopicPartition> partitions):跳到指定分区的结尾处。- position(TopicPartition partition):返回指定分区当前的offset。


使用消费者API消费消息

根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步:

1、创建KafkaConsumer对象,创建该对象时要传入Properties对象,用于对该消费者进行配置。

2、调用KafkaConsumer对象的poll()方法拉取消息,该方法返回ConsumerRecords。

3、对ConsumerRecords执行迭代,即可获取到抓取的每条消息。

4、程序结束时,取消订阅,关闭KafkaConsumer。



消费者消费消息的代码演示

1、官方API示例

KafkaConsumer

在这里插入图片描述

2、创建消费者类

在上一篇的生产者项目中,再写2个消费者来消费消息

Kafka 生产者API 用法

如图,创建2个消费者类,这个是消费者01,消费者02和01都是一模一样的。

在这里插入图片描述

在这里插入图片描述



3、演示消费结果

在这里插入图片描述



1、演示消费者属于同一个消费者组

如上图,可以看出,两个消费者属于同一个消费者组 ConsumerGroupTest_01 ,所以两个消费者消费到的消息是不重复的。因为每个消费者消费的分区都是不同的。

演示前预期结果:因为两个消费者属于同一个消费者组,所以每个消费者消费的分区都是不同的,也就是不会重复消费消息

在这里插入图片描述

演示结果:

演示步骤:启动两个消费者实例,然后启动生产者,往test2主题中发送20条消息,10条消息带key,10条消息不带key,大概率这各10条的消息就会被分配在不同的2个分区中。
根据kafka默认的分区消费规则,应该是一个消费者消费一个分区的消息

生产者发送消息:
生产者代码在这篇:
Kafka 生产者API 用法

在这里插入图片描述

在这里插入图片描述

消费者消费:

如图:消费者01 获取到了带key的消息并消费,消费者02 获取到了不带key的消息并消费,这里的消费消息先弄成打印就可以了。

注意:演示中,多次重启消费者,然后再启动生产者发送消息,总是消费者01消费到所有消息,消费者02没有消费到消息,然后生产者重新发送消息,才能有以下的演示结果。

类似集群模式,就是消息只能被一个消费者消费

在这里插入图片描述



2、演示消费者不属于同一个消费者组

因为两个消费者不属于同一个消费者组,所以两个消费者都能消费到test2主题下的所有分区的消息。

演示步骤:其他代码没变,只是修改了他们所属的消费者组
在这里插入图片描述

在这里插入图片描述

演示结果如图:两个消费者不属于同一个消费者组,每个消费者都能消费到所有消息,

类似于广播模式、或者是发布/订阅模式,
发布/订阅模型可以让一条消息能被多个消费者消费

在这里插入图片描述



3、停止线程不适用

这个停止消费者的线程好像没有用,如图,我生产者再发送消息后,这个消费者还是能消费到消息,并没有想象中的被停止。
现阶段要关闭消费者的话,直接关闭项目就可以了

在这里插入图片描述



4、一些参数解释

在这里插入图片描述

auto.offset.reset

设置从哪里读取消息

在这里插入图片描述



代码

生产者:MessageProducer

package cn.ljh;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Properties: Kafka 设计了 Properties 来封装所有的配置属性* <p>* KafkaProducer:用来创建消息生产者,是 生产者API 的核心类,* 它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象* <p>* ProducerRecord:代表了一条消息,Kafka 的消息是包含了key、value、timestamp*/
public class MessageProducer
{//主题常量public static final String TEST_TOPIC = "test2";public static void main(String[] args){//Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性Properties props = new Properties();//指定连接Kafka的地址,多个地址之间用逗号隔开props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");//指定Kafka的消息确认机制//0:不等待消息确认;1:只等待领导者分区的消息写入之后确认;all:等待所有分区的消息都写入之后才确认props.put("acks", "all");//指定消息发送失败的重试多少次props.put("retries", 0);//控制生产者在发送消息之前等待的时间//props.put("linger.ms", 3);//设置序列化器props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//1、创建 KafkaProducer 时,需要传入 Properties 对象来配置消息生产者Producer<String, String> producer = new KafkaProducer<>(props);//2、发送消息for (int i = 0; i < 20; i++){var msg = "这是第【 " + (i + 1) + " 】条消息!";if (i < 10){//发送带 key 的消息producer.send(new ProducerRecord<String, String>(TEST_TOPIC, "ljh", msg));} else{//发送不带 key 的消息producer.send(new ProducerRecord<String, String>(TEST_TOPIC, msg));}}System.out.println("消息发送成功!");//3、关闭资源producer.close();}
}

消费者 Consumer01

package cn.ljh;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;public class Consumer01
{//组id:设置这个消费者实例是属于 ConsumerGroupTest_01 这个消费者组的public static final String GROUP_ID = "ConsumerGroupTest_01";//1、创建 KafkaConsumer 消费者对象 ,把这个消费者定义成成员变量public static KafkaConsumer<String, String> consumer = null;public static void main(String[] args){//Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性Properties props = new Properties();//指定连接Kafka的地址,多个地址之间用逗号隔开props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");//设置这个消费者实例属于哪个消费者组props.setProperty("group.id", GROUP_ID);//自动提交offset,就是类似之前的自动消息确认props.setProperty("enable.auto.commit", "true");//多个消息之间,自动提交消息的时间间隔props.setProperty("auto.commit.interval.ms", "1000");//设置session的超时时长,默认是10秒,这里设置15秒props.setProperty("session.timeout.ms", "15000");//设置每次都从最新的消息开始读取props.setProperty("auto.offset.reset","latest");//设置序列化器props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//1、创建 KafkaConsumer 消费者对象consumer = new KafkaConsumer<>(props);//2、订阅主题,订阅kafka集群中的test2主题consumer.subscribe(Arrays.asList(MessageProducer.TEST_TOPIC));//因为获取消息的循环是一个死循环,没法退出,所以我在这里再加一个线程来关闭这个消费者//启动一个线程来关闭这个 KafkaConsumernew Thread(() ->{//创建一个Scanner 类来读取控制台数据Scanner sc = new Scanner(System.in);//如果有下一行,就读取下一行while (sc.hasNextLine()){//获取控制台下一行的内容var str = sc.nextLine();//就是这个线程一直监听控制台,如果我们在控制台输出” :exit “,则关闭这个这个 KafkaConsumerif (str.equals(":exit")){//取消订阅consumer.unsubscribe();//关闭消费者对象consumer.close();}}}).start();//这是一个死循环,一直在获取主题中的消息while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("收到消息: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}
}

消费者 Consumer02

package cn.ljh;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;public class Consumer02
{//组id:设置这个消费者实例是属于 ConsumerGroupTest_02 这个消费者组的public static final String GROUP_ID = "ConsumerGroupTest_02";//1、创建 KafkaConsumer 消费者对象 ,把这个消费者定义成成员变量public static KafkaConsumer<String, String> consumer = null;public static void main(String[] args){//Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性Properties props = new Properties();//指定连接Kafka的地址,多个地址之间用逗号隔开props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");//设置这个消费者实例属于哪个消费者组props.setProperty("group.id", GROUP_ID);//自动提交offset,就是类似之前的自动消息确认props.setProperty("enable.auto.commit", "true");//多个消息之间,自动提交消息的时间间隔props.setProperty("auto.commit.interval.ms", "1000");//设置session的超时时长,默认是10秒,这里设置15秒props.setProperty("session.timeout.ms", "15000");//设置每次都从最新的消息开始读取props.setProperty("auto.offset.reset","latest");//设置序列化器props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//1、创建 KafkaConsumer 消费者对象consumer = new KafkaConsumer<>(props);//2、订阅主题,订阅kafka集群中的test2主题consumer.subscribe(Arrays.asList(MessageProducer.TEST_TOPIC));//因为获取消息的循环是一个死循环,没法退出,所以我在这里再加一个线程来关闭这个消费者//启动一个线程来关闭这个 KafkaConsumernew Thread(() ->{//创建一个Scanner 类来读取控制台数据Scanner sc = new Scanner(System.in);//如果有下一行,就读取下一行while (sc.hasNextLine()){//获取控制台下一行的内容var str = sc.nextLine();//就是这个线程一直监听控制台,如果我们在控制台输出” :exit “,则关闭这个这个 KafkaConsumerif (str.equals(":exit")){//取消订阅consumer.unsubscribe();//关闭消费者对象consumer.close();}}}).start();//这是一个死循环,一直在获取主题中的消息while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("收到消息: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>kafkaproducertest</artifactId><version>1.0.0</version><!-- 项目名,和 artifactId 保持一致 --><name>kafkaproducertest</name><properties><!-- 在这里指定编译器的版本 --><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- 导入 Kafka 客户端APIJAR--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency></dependencies></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

雍禾医疗携手国内三甲医院展开合作 雍禾植发助力行业健康发展

根据《都市人群毛发健康消费白皮书》调研显示,国内毛发医疗服务行业市场规模从2021年至2030年将以21.8%的复合年均增长率(CAGR)增长,2030年将达到1381亿元。市场前景广阔的同时,消费需求也在升级迭代。 此前&#xff0c;北京大学人民医院与雍禾医疗共建“北京大学人民医院X雍禾…

有关Quick BI中lod_fixed函数中以MAX()作为过滤条件报错

一、Quick BI中的lod_fixed函数 lod_fixed{维度1[,维度2]...:聚合表达式[:过滤条件]} 作用&#xff1a;使用指定维度进行计算而不引用任何其他维度。其中&#xff0c; 维度1[,维度2]...&#xff1a;声明维度&#xff0c;指定聚合表达式要连接到的一个或多个维度。使用逗号分…

Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient

前面介绍过NetworkClient的实现&#xff0c;它依赖于KSelector、InFlightRequests、Metadata等组件&#xff0c;负责管理客户端与Kafka集群中各个Node节点之间的连接&#xff0c;通过KSelector法实现了发送请求的功能&#xff0c;并通过一系列handle*方法处理请求响应、超时请求…

0122-2-JavaScript正则表达式

《JavaScript正则表达式》 第一章 正则表达式 字符匹配 正则表达式是匹配模式&#xff0c;要么匹配字符&#xff0c;要么匹配位置&#xff01; 横向匹配 /ab[2,5]/c/g 匹配 abc, abbc,abbbc,abbbbc,abbbbbc&#xff0c;数字连续出现 2 到 5 次&#xff0c;会匹配 2 位、3 位、…

C++ STL之string的使用及模拟实现

文章目录 1. 前言2. 介绍3. string类的使用3.1 string类的构造函数3.2 string类对象的容量操作3.3 string类对象的访问及遍历操作3.4 string类对象的修改操作3.5 string类对象的字符串操作3.6 string类的非成员函数 4. string类的模拟实现 1. 前言 C语言中&#xff0c;字符串是…

地图 - 实现有多条定位,显示多条定位,并且使用一个圆形遮罩层将多条定位进行覆盖

首先&#xff0c;需要在你的index.html模板页面头部加载百度地图JavaScript API代码&#xff0c;密钥可去百度地图开放平台官网申请 <script type"text/javascript" src"//api.map.baidu.com/api?typewebgl&v1.0&ak您的密钥"></script&…

代码随想录 Leetcode1047. 删除字符串中的所有相邻重复项

题目&#xff1a; 代码(首刷自解 2024年1月21日&#xff09;&#xff1a; class Solution { public:string removeDuplicates(string s) {if (s.size() < 2) return s;stack<char> t;for (int i 0; i < s.size(); i) {if (t.empty()) t.push(s[i]);else {if (s[i…

InnoDB的Buffer Pool

前置概念&#xff1a;一个数据页16KB&#xff0c;一个数据页可能有多个记录&#xff0c;即使我们只需要访问一条记录&#xff0c;需要把整个数据页加载到内存中&#xff0c;加载到内存后不是直接释放&#xff0c;而是缓存到内存当中&#xff08;当然对于buffer pool的缓存是在存…

若依管理系统搭建教程,ruoyi-vue环境搭建

环境部署 准备工作 JDK > 1.8 (推荐1.8版本) Mysql > 5.7.0 (推荐5.7版本) Maven > 3.0 运行系统 1、前往Gitee下载页面([https://gitee.com/y_project/RuoYi (opens new window)](https://gitee.com/y_project/RuoYi))下载解压到工作目录 2、导入到Eclipse&#…

ESP32-TCP服务端(Arduino)

将ESP32设置为TCP服务器 介绍 TCP&#xff08;Transmission Control Protocol&#xff09;传输控制协议&#xff0c;是一种面向连接的&#xff08;一个客户端对应一个服务端&#xff09;、可靠的传输层协议。在TCP的工作原理中&#xff0c;它会将消息或文件分解为更小的片段&a…

Day16 linuxC高级(存储类型 linux命令 shell命令)

文章目录 C补充标识常量存储类型1.auto // 自动型2.static&#xff1a;修饰变量和函数 // 静态型3.extern&#xff1a;外部引用4.register&#xff1a;寄存器类型 LinuxC高级简介&#xff1a;嵌入式系统(将软件嵌入到硬件里面)Linux起源查看操作系统版本内核系统架构系统关机或…

CSDN COC西安城市开发者社区2023年度线下聚会

1. 活动背景 CSDN始终致力于促进城市区域内尖端新型技术开发者交流&#xff0c;提供开放自由的切磋平台。在这个充满挑战和机遇的一年即将结束之际&#xff0c;通过本次聚会&#xff0c;汇聚西安本地各行各业的开发者朋友&#xff0c;回顾过去一年城市社区的成就和收获&#x…

Spring5系列学习文章分享---第一篇(概述+特点+IOC原理+IOC并操作之bean的XML管理操作)

目录 Spring&#xff08;概述特点IOC原理IOC并操作之bean的XML管理操作&#xff09;概述Spring是轻量级的开源的JavaEE框架Spring可以解决企业应用开发的复杂性Spring有两个核心部分ioc,aopSpring特点 loc(概念和原理)什么是 IOCIOC 底层原理IOC 过程图 IOC&#xff08;接口&am…

MySQL的一些综合运用

一些基本的语句&#xff1a; USE dept_emp; CREATE TABLE dept ( deptno INT(2) NOT NULL COMMENT 部门编号, dname VARCHAR (15) COMMENT 部门名称, loc VARCHAR (20) COMMENT 地理位置 ); -- 添加主键 ALTER TABLE dept ADD PRIMARY KEY (deptno); -- 添加数据 INSE…

前端转鸿蒙的就业前景如何?有必要学鸿蒙么?

学习鸿蒙开发是否有必要&#xff0c;取决于多个因素&#xff1a; 一、个人兴趣与职业规划&#xff1a; 如果你对华为鸿蒙操作系统&#xff08;HarmonyOS&#xff09;感兴趣&#xff0c;并且希望将这个平台作为你的职业发展的方向&#xff0c;那么学习鸿蒙开发是非常有意义的。…

MSG3D

论文在stgcn与sta-lstm基础上做的。下面讲一下里面的方法&#xff1a; 1.准备工作 符号。这里是对符号进行解释。 一个人体骨骼图被记为G(v,E) 图卷积&#xff1a; 图卷积定义 考虑一种常用于处理图像的标准卷积神经网络 (CNN)。输入是像素网格。每个像素都有一个数据值向…

x-cmd pkg | speedtest-cli - 网络速度测试工具

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 speedtest-cli 是一个网络速度测试工具&#xff0c;用于测试计算机或服务器与速度测试服务器之间的网络连接速度。 它使用 speedtest.net 测试互联网带宽&#xff0c;可以帮助用户获取网络的上传和下载速度、延迟等参…

【复现】SpringBlade SQL 注入漏洞_22

目录 一.概述 二 .漏洞影响 三.漏洞复现 1. 漏洞一&#xff1a; 四.修复建议&#xff1a; 五. 搜索语法&#xff1a; 六.免责声明 一.概述 SpringBlade 是由一个商业级项目升级优化而来的SpringCloud微服务架构&#xff0c;采用Java8 API重构了业务代码&#xff0c;完全…

【C++初阶】第二站:类与对象(上) -- 下部分

前言&#xff1a; 本章知识点&#xff1a; 类对象模型、 this 指针 专栏&#xff1a; C初阶 目录 类对象模型 如何计算类对象的大小 类对象的存储方式猜测 结构体内存对齐规则 this指针 this指针的引出 this指针的特性 C语言和C实现Stack的对比 C语言实现 C实现 类对象模型 …

动态规划——炮兵回城【集训笔记】

题目描述 游戏盘面是一个m行n列的方格矩阵&#xff0c;将每个方格用坐标表示&#xff0c;行坐标从下到上依次递增&#xff0c;列坐标从左至右依次递增&#xff0c;左下角方格的坐标为(1,1)&#xff0c;则右上角方格的坐标为(m,n)。 游戏结束盘上只剩下一枚炮兵没有回到城池中&a…