Kafka原生API使用Java代码-消费者组-消费模式

文章目录

  • 1、消费模式
    • 1.1、创建一个3分区1副本的 主题 my_topic1
    • 1.2、创建生产者 KafkaProducer1
    • 1.2、创建消费者
      • 1.2.1、创建消费者 KafkaConsumer1Group1 并指定组 my_group1
      • 1.2.3、创建消费者 KafkaConsumer2Group1 并指定组 my_group1
      • 1.2.3、创建消费者 KafkaConsumer3Group1 并指定组 my_group1
      • 1.2.4、创建消费者 KafkaConsumer1Group2 并指定组 my_group2
    • 1.3、eagle for apache kafka
      • 1.3.1、查看分区0的数据
      • 1.3.2、查看分区1的数据
      • 1.3.3、查看分区2的数据

1、消费模式

消费模式

  1. 点对点:一个组消费消息时,只能由组内的一个消费者消费一次 避免重复消费
  2. 发布订阅:多个组消费消息时,每个组都可以消费一次消息

1.1、创建一个3分区1副本的 主题 my_topic1

在这里插入图片描述

1.2、创建生产者 KafkaProducer1

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducer1 {/*** 主函数用于演示如何向Kafka的特定主题发送消息。** @param args 命令行参数(未使用)*/public static void main(String[] args) throws ExecutionException, InterruptedException {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口props.put("acks", "all"); // 确认消息写入策略props.put("retries", 0); // 消息发送失败时的重试次数props.put("linger.ms", 1); // 发送缓冲区等待时间props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息到主题"my_topic3"// 异步发送消息:不接收kafka的响应//producer.send(new ProducerRecord<String, String>("my_topic3",  "hello,1,2,3"));// 注释掉的循环代码块展示了如何批量发送消息//for (int i = 0; i < 100; i++)//    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));for (int i=0;i<20;i++) {producer.send(new ProducerRecord<String, String>("my_topic1",i%3,"null","我是"+i),new Callback() {//消息发送成功,kafka broker ack 以后回调@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {//exception:如果有异常代表消息未能正常发送到kafka,没有异常代表消息发送成功://此时kafka的消息不一定持久化成功(需要kafka生产者加配置)//RecordMetadata代表发送成功的消息的元数据System.out.println("partition = " + recordMetadata.partition());}});}// 关闭生产者实例producer.close();}
}
partition = 2
partition = 2
partition = 2
partition = 2
partition = 2
partition = 2
partition = 1
partition = 1
partition = 1
partition = 1
partition = 1
partition = 1
partition = 1
partition = 0
partition = 0
partition = 0
partition = 0
partition = 0
partition = 0
partition = 0

1.2、创建消费者

1.2.1、创建消费者 KafkaConsumer1Group1 并指定组 my_group1

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer1Group1 {/*** 主函数入口,创建并运行一个Kafka消费者来消费主题"foo"和"bar"的消息。** @param args 命令行参数(未使用)*/public static void main(String[] args) {// 初始化Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.74.148:9092"); // Kafka broker的地址和端口props.setProperty("group.id", "my_group1"); // 消费者组IDprops.setProperty("enable.auto.commit", "true"); // 自动提交偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自动提交偏移量的时间间隔props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使用配置创建KafkaConsumer实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅要消费的主题consumer.subscribe(Arrays.asList("my_topic1"));// 持续消费消息while (true) {// 从Kafka服务器拉取一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 遍历并处理收到的消息记录for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d,partition: %d,value = %s%n",record.offset(),record.partition(), record.value());}}}
offset = 0,partition: 2,value = 我是2
offset = 1,partition: 2,value = 我是5
offset = 2,partition: 2,value = 我是8
offset = 3,partition: 2,value = 我是11
offset = 4,partition: 2,value = 我是14
offset = 5,partition: 2,value = 我是17

1.2.3、创建消费者 KafkaConsumer2Group1 并指定组 my_group1

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer2Group1 {/*** 主函数入口,创建并运行一个Kafka消费者来消费主题"foo"和"bar"的消息。** @param args 命令行参数(未使用)*/public static void main(String[] args) {// 初始化Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.74.148:9092"); // Kafka broker的地址和端口props.setProperty("group.id", "my_group1"); // 消费者组IDprops.setProperty("enable.auto.commit", "true"); // 自动提交偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自动提交偏移量的时间间隔props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使用配置创建KafkaConsumer实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅要消费的主题consumer.subscribe(Arrays.asList("my_topic1"));// 持续消费消息while (true) {// 从Kafka服务器拉取一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 遍历并处理收到的消息记录for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d,partition: %d,value = %s%n",record.offset(),record.partition(), record.value());}}}
offset = 0,partition: 1,value = 我是1
offset = 1,partition: 1,value = 我是4
offset = 2,partition: 1,value = 我是7
offset = 3,partition: 1,value = 我是10
offset = 4,partition: 1,value = 我是13
offset = 5,partition: 1,value = 我是16
offset = 6,partition: 1,value = 我是19

1.2.3、创建消费者 KafkaConsumer3Group1 并指定组 my_group1

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer3Group1 {/*** 主函数入口,创建并运行一个Kafka消费者来消费主题"foo"和"bar"的消息。** @param args 命令行参数(未使用)*/public static void main(String[] args) {// 初始化Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.74.148:9092"); // Kafka broker的地址和端口props.setProperty("group.id", "my_group1"); // 消费者组IDprops.setProperty("enable.auto.commit", "true"); // 自动提交偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自动提交偏移量的时间间隔props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使用配置创建KafkaConsumer实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅要消费的主题consumer.subscribe(Arrays.asList("my_topic1"));// 持续消费消息while (true) {// 从Kafka服务器拉取一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 遍历并处理收到的消息记录for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d,partition: %d,value = %s%n",record.offset(),record.partition(), record.value());}}}
offset = 0,partition: 0,value = 我是0
offset = 1,partition: 0,value = 我是3
offset = 2,partition: 0,value = 我是6
offset = 3,partition: 0,value = 我是9
offset = 4,partition: 0,value = 我是12
offset = 5,partition: 0,value = 我是15
offset = 6,partition: 0,value = 我是18

1.2.4、创建消费者 KafkaConsumer1Group2 并指定组 my_group2

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer1Group2 {/*** 主函数入口,创建并运行一个Kafka消费者来消费主题"foo"和"bar"的消息。** @param args 命令行参数(未使用)*/public static void main(String[] args) {// 初始化Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.74.148:9092"); // Kafka broker的地址和端口props.setProperty("group.id", "my_group2"); // 消费者组IDprops.setProperty("enable.auto.commit", "true"); // 自动提交偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自动提交偏移量的时间间隔props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使用配置创建KafkaConsumer实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅要消费的主题consumer.subscribe(Arrays.asList("my_topic1"));// 持续消费消息while (true) {// 从Kafka服务器拉取一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 遍历并处理收到的消息记录for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d,partition: %d,value = %s%n",record.offset(),record.partition(), record.value());}}}
offset = 0,partition: 2,value = 我是2
offset = 1,partition: 2,value = 我是5
offset = 2,partition: 2,value = 我是8
offset = 3,partition: 2,value = 我是11
offset = 4,partition: 2,value = 我是14
offset = 5,partition: 2,value = 我是17
offset = 0,partition: 1,value = 我是1
offset = 1,partition: 1,value = 我是4
offset = 2,partition: 1,value = 我是7
offset = 3,partition: 1,value = 我是10
offset = 4,partition: 1,value = 我是13
offset = 5,partition: 1,value = 我是16
offset = 6,partition: 1,value = 我是19
offset = 0,partition: 0,value = 我是0
offset = 1,partition: 0,value = 我是3
offset = 2,partition: 0,value = 我是6
offset = 3,partition: 0,value = 我是9
offset = 4,partition: 0,value = 我是12
offset = 5,partition: 0,value = 我是15
offset = 6,partition: 0,value = 我是18

在这里插入图片描述

1.3、eagle for apache kafka

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.3.1、查看分区0的数据

在这里插入图片描述

[[{"partition": 0,"offset": 0,"msg": "我是0","timespan": 1717226677707,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 1,"msg": "我是3","timespan": 1717226677720,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 2,"msg": "我是6","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 3,"msg": "我是9","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 4,"msg": "我是12","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 5,"msg": "我是15","timespan": 1717226677722,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 6,"msg": "我是18","timespan": 1717226677722,"date": "2024-06-01 07:24:37"}]
]

1.3.2、查看分区1的数据

在这里插入图片描述

[[{"partition": 1,"offset": 0,"msg": "我是1","timespan": 1717226677720,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 1,"msg": "我是4","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 2,"msg": "我是7","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 3,"msg": "我是10","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 4,"msg": "我是13","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 5,"msg": "我是16","timespan": 1717226677722,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 6,"msg": "我是19","timespan": 1717226677722,"date": "2024-06-01 07:24:37"}]
]

1.3.3、查看分区2的数据

在这里插入图片描述

[[{"partition": 2,"offset": 0,"msg": "我是2","timespan": 1717226677720,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 1,"msg": "我是5","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 2,"msg": "我是8","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 3,"msg": "我是11","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 4,"msg": "我是14","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 5,"msg": "我是17","timespan": 1717226677722,"date": "2024-06-01 07:24:37"}]
]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/20486.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法练习第25天|491. 非递减子序列

491. 非递减子序列 491. 非递减子序列https://leetcode.cn/problems/non-decreasing-subsequences/ 题目描述&#xff1a; 给你一个整数数组 nums &#xff0c;找出并返回所有该数组中不同的递增子序列&#xff0c;递增子序列中 至少有两个元素 。你可以按 任意顺序 返回答案…

Linux、Windows安装python环境(最新版及历史版本指定版本)-python

目录 一、Linux环境二、windows环境最新版本下载指定版本下载 python 官网地址&#xff1a; https://www.python.org/ 一、Linux环境 以openEuler/CentOS为例 查看可安装python源版本 dnf provides python*默认安装新版本 dnf install -y python3. 进入python python退出p…

电源小白入门学习8——电荷泵电路原理及使用注意事项

电源小白入门学习8——电荷泵电路原理及使用注意事项 电荷泵简介电荷泵原理电荷泵设计过程中需要注意的点fly电容的安秒平衡DC/DC功率转换技术对比 电荷泵简介 电荷泵&#xff08;Charge Pump&#xff09;是一种电路拓扑结构&#xff0c;用于实现电压升压或降压的功能。它通过…

sh发送邮件如何通过配置SMTP服务器来实现?

sh发送邮件的操作方法&#xff1f;如何使用Shell脚本自动发信&#xff1f; 在Shell脚本中实现邮件发送功能是一项常见需求&#xff0c;特别是在自动化任务执行或系统监控中。AokSend将介绍如何通过配置SMTP服务器来实现sh发送邮件的方法和注意事项。 sh发送邮件&#xff1a;安…

【已解决】Error in the HTTP2 framing layer

1.问题描述 在使用git将代码上传github的时候在最后一部push的时候遇到这个fatal 2.解决方案 由于我原先设置的origin是http协议下的&#xff0c;如下 git remote add origin https://github.com/Charlesbibi/Simple_Cloud.githttp协议下行不通不妨试一试ssh协议下&#xff…

跟风报考PMP,我真的后悔了

真的太香吧&#xff01; 我一开始没打算报考PMP证书的&#xff0c;但是我看身边很多朋友都因为PMP证书得到了升职加薪&#xff0c;这让我实在是一整个羡慕住了&#xff0c;所以我也去报考了PMP。 报考PMP前期我做了什么&#xff1f; 由于我是零基础&#xff0c;没有什么项目…

探索网格生成技术在AI去衣应用中的作用

引言&#xff1a; 随着人工智能技术的飞速发展&#xff0c;其在图像处理和计算机视觉领域的应用日益广泛。其中&#xff0c;AI去衣技术作为一种新兴的应用&#xff0c;引起了广泛的关注和讨论。然而&#xff0c;要实现这一功能并非易事&#xff0c;需要借助于先进的算法和技术。…

Mybatis第一讲——你会Mybatis吗?

文章目录 什么是MybatisMybatis的作用是什么 Mybatis 怎么使用注解的方式注解的多种使用Options注解ResultType注解 XML的方式update标签 #{} 和 ${}符号的区别#{}占位${}占位 ${}占位的危险性(SQL注入)数据库连接池 什么是Mybatis 首先什么是Mybatis呢&#xff1f;Mybatis是一…

latex bib引参考文献

1.bib内容 2.sn-mathphys-num是官方的参考文献格式 3.不用导cite包&#xff0c;文中这么写 4.end document前ckwx是自己命名的bib的名字

Ollama教程,本地部署大模型Ollama,docker安装方法,仅供学习使用

不可商用&#xff01;&#xff01;仅仅提供学习使用&#xff01; 先上视频教学&#xff1a; Ollama教程&#xff0c;本地部署大模型Ollama&#xff0c;docker安装方法&#xff0c;仅供学习使用&#xff01; 资料获取 &#xff1a; Ollama下载包和安装文档在这里&#xff1…

Web自动化测试-掌握selenium工具用法,使用WebDriver测试Chrome/FireFox网页(Java

目录 一、在Eclipse中构建Maven项目 1.全局配置Maven 2.配置JDK路径 3.创建Maven项目 4.引入selenium-java依赖 二、Chrome自动化脚本编写 1.创建一个ChromeTest类 2.测试ChromeDriver 3.下载chromedriver驱动 4.在脚本中通过System.setProperty方法指定chromedriver的…

Ubuntu 20.04安装CMake 3.22.6版本

Ubuntu 20.04通过apt安装的cmake版本是3.16.3&#xff0c;默认安装到/usr/bin/cmake路径。 $ cmake Command cmake not found, but can be installed with:sudo snap install cmake # version 3.29.3, or sudo apt install cmake # version 3.16.3-1ubuntu1.20.04.1See sna…

Unity + 雷达 粒子互动(待更新)

效果预览: 花海(带移动方向) VFX 实例 脚本示例 使用TouchScript,计算玩家是否移动,且计算移动方向 using System.Collections; using System.Collections.Generic; using TouchScript; using TouchScript.Pointers; using UnityEngine; using UnityEngine.VFX;public …

AI预测福彩3D采取888=3策略+和值012路一缩定乾坤测试6月1日预测第8弹

今天继续基于8883的大底&#xff0c;使用尽可能少的条件进行缩号。好了&#xff0c;直接上结果吧~ 首先&#xff0c;888定位如下&#xff1a; 百位&#xff1a;6,5,4,7,8,9,1,0 十位&#xff1a;7,8,6,5,9,3,1,0 个位&#xff1a;5,7,6,4,2,…

linux 内核哪种锁可以递归调用 ?

当数据被多线程并发访问(读/写)时&#xff0c;需要对数据加锁。linux 内核中常用的锁有两类&#xff1a;自旋锁和互斥体。在使用锁的时候&#xff0c;最常见的 bug 是死锁问题&#xff0c;死锁问题很多时候比较难定位&#xff0c;并且影响较大。本文先会介绍两种引起死锁的原因…

Java-----String类

1.String类的重要性 经过了C语言的学习&#xff0c;我们认识了字符串&#xff0c;但在C语言中&#xff0c;我们表示字符串进行操作的话需要通过字符指针或者字符数组&#xff0c;可以使用标准库中提供的一系列方法对字符串的内容进行操作&#xff0c;但这种表达和操作数据的方…

沟通程序化(1):跟着鬼谷子学沟通—“飞箝”之术

沟通的基础需要倾听&#xff0c;但如果对方听不进你的话&#xff0c;即便你说的再有道理&#xff0c;对方也很难入心。让我们看看鬼谷子的“飞箝”之术能给我们带来什么样的启发吧&#xff01; “飞箝”之术&#xff0c;源自中国古代兵法家、纵横家鼻祖鬼谷子的智慧&#xff0…

SpringBootWeb 篇-深入了解 Spring 异常处理、事务管理和配置文件参数配置化、yml 配置文件

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 配置文件 1.1 yml 配置文件 1.2 参数配置化 1.2.1 使用 Value 注解注入单个配置参数 1.2.2 使用 ConfigurationProperties 注解将一组相关配置参数注入到一个类中…

discuz论坛怎么修改备案信息

大家好&#xff0c;今天给大家分享下discuz如何填写备案信息并且展示在网站首页。大家都知道国内网站都需要备案&#xff0c;不通过备案的网站上是没办法通过域名打开的。大家也可以通过搜索网创有方&#xff0c;或者直接点击网创有方 查看悬挂备案号后的效果。 首先大家可以看…

安全测试扫描利器-Burpsuite

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…