kafka用java收发消息

用java客户端代码来对kafka收发消息
具体代码如下

package com.cool.interesting.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;public class KafkaTest {private static final String BOOTSTRAP_SERVERS = "192.168.47.145:9092";private static final String TOPIC_NAME = "test";public static void main(String[] args) {// 生产者示例produceMessage();// 消费者示例consumeMessage();//从指定偏移量消费消息consumeOffsetMessage();}//生产者代码private static void produceMessage() {Properties props = new Properties();//acks是保证消息的发送机制,有以下几个值//acks = 0:表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。//acks = 1: 表示leader副本成功写入就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。默认值即是1。//acks = all或-1: 表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。//调优建议:建议根据实际情况设置,如果要严格保证消息不丢失,请设置为all或-1;如果允许存在丢失,建议设置为1;一般不建议设为0,除非无所谓消息丢不丢失。props.put(ProducerConfig.ACKS_CONFIG,1);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//key和value序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//其他配置参数详见org.apache.kafka.clients.producer.ProducerConfig类try (Producer<String, String> producer = new KafkaProducer<>(props)) {for (int i = 0; i < 10; i++) {String message = "Message " + i;//异步发送Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC_NAME,  message));System.out.println("Sent message: " + message);}}}//正常消费者代码private static void consumeMessage() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test99");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅一个topicconsumer.subscribe(Collections.singletonList(TOPIC_NAME));while (true) {//设置kafak从broker拉取消息的超时时间// (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {System.out.println("Received_message: " + record.value());}}}//指定偏移量开始消费private static void consumeOffsetMessage() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅一个topicconsumer.subscribe(Collections.singletonList(TOPIC_NAME));//如果要指定偏移量,必须先poll一次,不然代码报错ConsumerRecords<String, String> poll = consumer.poll(0);System.out.println("poll:"+poll.isEmpty());//创建一个分区(参数为topic_name,和分区序号)TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);// 指定要消费的偏移量long offset = 3;//从指定偏移量开始消息消息consumer.seek(topicPartition, offset);while (true) {//设置kafka从broker拉取消息的超时时间// (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {System.out.println("Received_message: " + record.value());}}}}

安装kafka的可视化工具:offset explorer
offset explorer 是一个用于查看和管理 Kafka 消费者组的工具,它允许你检查消费者组的偏移量(offset),并且可以查看每个消费者组在每个分区上的偏移量情况。这对于监控和调试 Kafka 消费者组非常有用。
下载地址为:https://www.kafkatool.com/download.html
如下图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/13021.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Microsoft 发布了 5 月份产品安全修复报告。

我们提请大家注意我们归类为趋势*的两个漏洞&#xff1a; CVE-2024-30051 和 CVE-2024-30040。 1. Windows 内核库 DWM 核心库中的 CVE-2024-30051漏洞 该漏洞与负责显示桌面窗口&#xff08;DWM&#xff09;的内核库中的权限升级有关。成功利用漏洞可讓攻擊者在沒有使用者互…

Python模块之Numpy(五)-- 排序

Sort排序 NumPy 的排序方式主要可以概括为直接排序和间接排序两种&#xff0c;直接排序是对数值直接进行排序&#xff0c;间接排序是指根据一个或者多个键对数据集进行排序&#xff0c;在 NumPy 中&#xff0c;直接排序经常使用 sort 函数&#xff0c;间接排序经常使用 argsort…

商品服务:SPUSKU规格参数销售属性

1.Object划分 1.PO&#xff08;Persistant Object&#xff09;持久对象 PO就是对应数据库中某个表中的一条记录&#xff0c;多个记录可以用PO的集合。PO中应该不报含任何对数据库的操作 2.DO(Domain Object) 领域对象 就是从现实世界中抽象出来的有形或无形的业务实体。 3…

mysql字段乱序 information_schema

select COLUMN_NAME from information_schema.columns where table_namecollect_column_info and table_schema nz; 返回ASCII排列 导致 sqoop import \ --connect "jdbc:mysql://your_host/collect" \ --username your_username \ --password your_password \ --t…

SPI通信(使用SPI读写W25Q64)

SPI通信协议 • SPI&#xff08;Serial Peripheral Interface&#xff09;是由Motorola公司开发的一种通用数据总线 • 四根通信线&#xff1a; SCLK:串行时钟线&#xff0c;用来提供时钟信号的。 MOSI:主机输出&#xff0c;从机输入 MISO:从机输出&#xff0c;主机输入 SS:…

faiss::gpu::runMatrixMult ... cublas failed (13)错误处理

我使用的是python3.8, torch1.11,cu113 解决方法是安装faiss-gpu1.7.3 我代码中出现这个错误尝试了使用pip安装faiss-gpu1.71&#xff0c;1.72。都没有用。 使用conda安装faiss-gpu的tar.bz2安装会存在找不到libfaiss.so的问题。 pypi官网还没有faiss-gpu1.7.3的版本&#xf…

Chrome查看User Agent的实战教程

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

Java中的数据类型与变量

引言&#xff1a; 哈喽&#xff0c;各位读者老爷们大家好呀,long time no see!这里是小堇Java小课堂&#xff0c;在本课堂中我们将继续分享Java中的数据类型与变量&#xff0c;标识符&#xff0c;关键字等知识&#xff0c;那我们启程咯&#xff01; 数据类型与变量 1.字面变量…

红蓝对抗 网络安全 网络安全红蓝对抗演练

什么是红蓝对抗 在军事领域&#xff0c;演习是专指军队进行大规模的实兵演习&#xff0c;演习中通常分为红军、蓝军&#xff0c;演习多以红军守、蓝军进攻为主。类似于军事领域的红蓝军对抗&#xff0c;网络安全中&#xff0c;红蓝军对抗则是一方扮演黑客&#xff08;蓝军&…

socket介绍

socket简介 我们知道两个进程如果需要进行通讯最基本的一个前提能能够唯一的标示一个进程&#xff0c;在本地进程通讯中我们可以使用PID来唯一标示一个进程&#xff0c;但PID只在本地唯一&#xff0c;网络中的两个进程PID冲突几率很大&#xff0c;这时候我们需要另辟它径了&…

pytest教程-46-钩子函数-pytest_sessionstart

领取资料&#xff0c;咨询答疑&#xff0c;请➕wei: June__Go 上一小节我们学习了pytest_report_testitemFinished钩子函数的使用方法&#xff0c;本小节我们讲解一下pytest_sessionstart钩子函数的使用方法。 pytest_sessionstart 是 Pytest 提供的一个钩子函数&#xff0c…

JSON对象相互转换

目录 String --> JsonNode对象 JsonNode对象 --> String Map --> JsonNode对象,object> JsonNode数组 --> List>集合 List> --> JsonNode 获取JsonNode中某个key的值 获取JsonNode字段下的某个数组 String --> JsonNode对象 // 创建一个 Ob…

Anaconda下载安装

看到这篇文章的同学们&#xff0c;说明你们是要下载Anaconda&#xff0c;这篇文章讲的就是下载安装教程。 Anaconda下载网址&#xff1a; Download Now | Anaconda 根据我们需要的系统版本下载&#xff0c;我的电脑是window&#xff0c;所以选择第一个&#xff0c;如下图&am…

javaEE进阶——SpringBoot与SpringMVC第一讲

文章目录 什么是springMVCSpringMVC什么是模型、视图、控制器MVC和SpringMVC的关系SpringMVC的使用第一个SpringMVC程序RestController什么是注解 那么RestController到底是干嘛的呢&#xff1f;RequestMapping 如何接收来自请求中的querystryingRequestParamRequestMapping(&q…

运用MongoDB Atlas释放开发者潜能同时把控成本

在当下的商业环境中&#xff0c;不可预测性已经成为常态&#xff0c;工程团队负责人必须在把控不可预测性和优化IT成本的双重挑战下谋求平衡。 咨询公司德勤2024 MarginPLUS调查收集了300多位企业负责人的见解&#xff0c;报告中重点介绍了面对动荡的全球经济环境&#xff0c;…

电子邮箱是什么?付费电子邮箱和免费电子邮箱有什么区别?

注册电子邮箱前&#xff0c;有付费电子邮箱和免费电子邮箱两类选择。付费的电子邮箱和免费的电子邮箱有什么区别呢&#xff1f;区别主要在于存储空间、功能丰富度和售后服务等方面&#xff0c;本文将为您详细介绍。 一、电子邮箱是什么&#xff1f; 电子邮箱就是线上的邮局&a…

labelimg删除用不到的标签(yolo格式)以及 下载使用

问题&#xff1a;当我们标注完成新的类别后后直接删除classes.txt中不需要的类别之后再次打开labelimg会闪退&#xff0c;如何删除不需要的标签并且能够正确运行呢&#xff1f;&#xff08;yolo格式&#xff09; 原因&#xff1a;当我们打开labelimg进行标注的时候&#xff0c…

LVM - Linux磁盘逻辑卷管理器概念讲解、实践及所遇到的问题

1、lvm概念 逻辑卷管理器(LogicalVolumeManager)本质上是一个虚拟设备驱动,是在内核中块设备和物理设备之间添加的一个新的抽象层次,它可以将几块磁盘(物理卷,PhysicalVolume)组合起来形成一个存储池或者卷组(VolumeGroup)。LVM可以每次从卷组中划分出不同大小的逻辑卷(Logi…

【C语言】必备Linux命令和C语言基础

&#x1f31f;博主主页&#xff1a;我是一只海绵派大星 &#x1f4da;专栏分类&#xff1a;嵌入式笔记 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、文件和目录相关命令 Linux 的文件系统结构 文件系统层次结构标准FHS pwd命令 ls 列目录内容 文件的权限 c…

STC8增强型单片机开发【热敏电阻】

目录 一、引言 二、热敏电阻概述 三、STC8增强型单片机简介 四、基于STC8单片机的热敏电阻测温系统 五、热敏电阻测温系统的优化与扩展 提高测量精度 扩展系统功能 六、 温度计算步骤 通过ADC采样计算出热敏电阻位置的电压 通过欧姆定律计算热敏电阻的阻值 通过阻值…