kafka用java收发消息

用java客户端代码来对kafka收发消息
具体代码如下

package com.cool.interesting.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;public class KafkaTest {private static final String BOOTSTRAP_SERVERS = "192.168.47.145:9092";private static final String TOPIC_NAME = "test";public static void main(String[] args) {// 生产者示例produceMessage();// 消费者示例consumeMessage();//从指定偏移量消费消息consumeOffsetMessage();}//生产者代码private static void produceMessage() {Properties props = new Properties();//acks是保证消息的发送机制,有以下几个值//acks = 0:表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。//acks = 1: 表示leader副本成功写入就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。默认值即是1。//acks = all或-1: 表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。//调优建议:建议根据实际情况设置,如果要严格保证消息不丢失,请设置为all或-1;如果允许存在丢失,建议设置为1;一般不建议设为0,除非无所谓消息丢不丢失。props.put(ProducerConfig.ACKS_CONFIG,1);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//key和value序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//其他配置参数详见org.apache.kafka.clients.producer.ProducerConfig类try (Producer<String, String> producer = new KafkaProducer<>(props)) {for (int i = 0; i < 10; i++) {String message = "Message " + i;//异步发送Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC_NAME,  message));System.out.println("Sent message: " + message);}}}//正常消费者代码private static void consumeMessage() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test99");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅一个topicconsumer.subscribe(Collections.singletonList(TOPIC_NAME));while (true) {//设置kafak从broker拉取消息的超时时间// (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {System.out.println("Received_message: " + record.value());}}}//指定偏移量开始消费private static void consumeOffsetMessage() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅一个topicconsumer.subscribe(Collections.singletonList(TOPIC_NAME));//如果要指定偏移量,必须先poll一次,不然代码报错ConsumerRecords<String, String> poll = consumer.poll(0);System.out.println("poll:"+poll.isEmpty());//创建一个分区(参数为topic_name,和分区序号)TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);// 指定要消费的偏移量long offset = 3;//从指定偏移量开始消息消息consumer.seek(topicPartition, offset);while (true) {//设置kafka从broker拉取消息的超时时间// (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {System.out.println("Received_message: " + record.value());}}}}

安装kafka的可视化工具:offset explorer
offset explorer 是一个用于查看和管理 Kafka 消费者组的工具,它允许你检查消费者组的偏移量(offset),并且可以查看每个消费者组在每个分区上的偏移量情况。这对于监控和调试 Kafka 消费者组非常有用。
下载地址为:https://www.kafkatool.com/download.html
如下图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/13021.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

商品服务:SPUSKU规格参数销售属性

1.Object划分 1.PO&#xff08;Persistant Object&#xff09;持久对象 PO就是对应数据库中某个表中的一条记录&#xff0c;多个记录可以用PO的集合。PO中应该不报含任何对数据库的操作 2.DO(Domain Object) 领域对象 就是从现实世界中抽象出来的有形或无形的业务实体。 3…

SPI通信(使用SPI读写W25Q64)

SPI通信协议 • SPI&#xff08;Serial Peripheral Interface&#xff09;是由Motorola公司开发的一种通用数据总线 • 四根通信线&#xff1a; SCLK:串行时钟线&#xff0c;用来提供时钟信号的。 MOSI:主机输出&#xff0c;从机输入 MISO:从机输出&#xff0c;主机输入 SS:…

Java中的数据类型与变量

引言&#xff1a; 哈喽&#xff0c;各位读者老爷们大家好呀,long time no see!这里是小堇Java小课堂&#xff0c;在本课堂中我们将继续分享Java中的数据类型与变量&#xff0c;标识符&#xff0c;关键字等知识&#xff0c;那我们启程咯&#xff01; 数据类型与变量 1.字面变量…

红蓝对抗 网络安全 网络安全红蓝对抗演练

什么是红蓝对抗 在军事领域&#xff0c;演习是专指军队进行大规模的实兵演习&#xff0c;演习中通常分为红军、蓝军&#xff0c;演习多以红军守、蓝军进攻为主。类似于军事领域的红蓝军对抗&#xff0c;网络安全中&#xff0c;红蓝军对抗则是一方扮演黑客&#xff08;蓝军&…

pytest教程-46-钩子函数-pytest_sessionstart

领取资料&#xff0c;咨询答疑&#xff0c;请➕wei: June__Go 上一小节我们学习了pytest_report_testitemFinished钩子函数的使用方法&#xff0c;本小节我们讲解一下pytest_sessionstart钩子函数的使用方法。 pytest_sessionstart 是 Pytest 提供的一个钩子函数&#xff0c…

Anaconda下载安装

看到这篇文章的同学们&#xff0c;说明你们是要下载Anaconda&#xff0c;这篇文章讲的就是下载安装教程。 Anaconda下载网址&#xff1a; Download Now | Anaconda 根据我们需要的系统版本下载&#xff0c;我的电脑是window&#xff0c;所以选择第一个&#xff0c;如下图&am…

javaEE进阶——SpringBoot与SpringMVC第一讲

文章目录 什么是springMVCSpringMVC什么是模型、视图、控制器MVC和SpringMVC的关系SpringMVC的使用第一个SpringMVC程序RestController什么是注解 那么RestController到底是干嘛的呢&#xff1f;RequestMapping 如何接收来自请求中的querystryingRequestParamRequestMapping(&q…

运用MongoDB Atlas释放开发者潜能同时把控成本

在当下的商业环境中&#xff0c;不可预测性已经成为常态&#xff0c;工程团队负责人必须在把控不可预测性和优化IT成本的双重挑战下谋求平衡。 咨询公司德勤2024 MarginPLUS调查收集了300多位企业负责人的见解&#xff0c;报告中重点介绍了面对动荡的全球经济环境&#xff0c;…

电子邮箱是什么?付费电子邮箱和免费电子邮箱有什么区别?

注册电子邮箱前&#xff0c;有付费电子邮箱和免费电子邮箱两类选择。付费的电子邮箱和免费的电子邮箱有什么区别呢&#xff1f;区别主要在于存储空间、功能丰富度和售后服务等方面&#xff0c;本文将为您详细介绍。 一、电子邮箱是什么&#xff1f; 电子邮箱就是线上的邮局&a…

labelimg删除用不到的标签(yolo格式)以及 下载使用

问题&#xff1a;当我们标注完成新的类别后后直接删除classes.txt中不需要的类别之后再次打开labelimg会闪退&#xff0c;如何删除不需要的标签并且能够正确运行呢&#xff1f;&#xff08;yolo格式&#xff09; 原因&#xff1a;当我们打开labelimg进行标注的时候&#xff0c…

LVM - Linux磁盘逻辑卷管理器概念讲解、实践及所遇到的问题

1、lvm概念 逻辑卷管理器(LogicalVolumeManager)本质上是一个虚拟设备驱动,是在内核中块设备和物理设备之间添加的一个新的抽象层次,它可以将几块磁盘(物理卷,PhysicalVolume)组合起来形成一个存储池或者卷组(VolumeGroup)。LVM可以每次从卷组中划分出不同大小的逻辑卷(Logi…

【C语言】必备Linux命令和C语言基础

&#x1f31f;博主主页&#xff1a;我是一只海绵派大星 &#x1f4da;专栏分类&#xff1a;嵌入式笔记 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、文件和目录相关命令 Linux 的文件系统结构 文件系统层次结构标准FHS pwd命令 ls 列目录内容 文件的权限 c…

STC8增强型单片机开发【热敏电阻】

目录 一、引言 二、热敏电阻概述 三、STC8增强型单片机简介 四、基于STC8单片机的热敏电阻测温系统 五、热敏电阻测温系统的优化与扩展 提高测量精度 扩展系统功能 六、 温度计算步骤 通过ADC采样计算出热敏电阻位置的电压 通过欧姆定律计算热敏电阻的阻值 通过阻值…

栈和队列经典面试题详解

目录 题目一&#xff1a;20. 有效的括号 - 力扣&#xff08;LeetCode&#xff09; 题目二&#xff1a;225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09; 题目三&#xff1a;232. 用栈实现队列 - 力扣&#xff08;LeetCode&#xff09; 题目四&#xff1a;622. 设…

软件压力测试怎么做

随着信息技术的迅猛发展&#xff0c;软件在各行各业的应用越来越广泛&#xff0c;其稳定性、可靠性和性能表现也受到了越来越多的关注。在这样的背景下&#xff0c;软件压力测试显得尤为重要。本文将详细介绍软件压力测试的概念、目的、方法以及实施步骤&#xff0c;帮助读者更…

浅析扩散模型与图像生成【应用篇】(二十五)——Plug-and-Play

25. Plug-and-Play: Diffusion Features for Text-Driven Image-to-Image Translation 该文提出一种文本驱动的图像转换方法&#xff0c;输入一张图像和一个目标文本描述&#xff0c;按照文本描述对输入图像进行转换&#xff0c;得到目标图像。图像转换任务其实本质上属于图像编…

对于接口的安全性测试,这几点你掌握了吗?

接口防刷 1.为什么会有人要刷接口&#xff1f; 牟利&#xff1a;黄牛在 12306 网上抢票再倒卖。 恶意攻击竞争对手&#xff1a;如短信接口被请求一次&#xff0c;会触发几分钱的运营商费用&#xff0c;当量级大了也很可观。 压测&#xff1a;用apache bench 做压力测试。 …

管仲故乡是颍川,何分颍上或颍下

第一仲父管仲&#xff0c;故乡在哪里&#xff1f;依然像许多名人故里一样存在争议&#xff0c;但是这个争议却很不一般&#xff0c;引出了一个大话题。 管子是安徽颍上县人&#xff0c;《史记》记载: “管仲&#xff0c;颍上人也。”颍上县有管鲍祠&#xff0c;是安徽省重点文物…

亚阈值电流镜

相同电流情况下,由于亚阈值区的gm较大,造成由于阈值电压Vth的失配造成的失配会更大,所以要规避过大的gm,选取较大的过驱动电压。 相同电流情况下,W/L的尺寸选的较小一点,或者说L一定时,W不要取得过大。 Q:Vgs一定的情况下,特别小,几乎小于Vth,一定是亚阈值电流镜吗。…

单位内部防泄密策略与技术实践

在信息时代&#xff0c;企业内部数据安全至关重要&#xff0c;尤其是涉及核心竞争力的重要文件&#xff0c;员工的不当操作或恶意泄露都可能给企业带来重大损失。本文将从制度建设、技术防护、以及日常管理三个方面入手&#xff0c;探讨如何构建一套行之有效的内部防泄密体系&a…