SpringBoot集成kafka-获取生产者发送的消息(阻塞式和非阻塞式获取)

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

说明

CompletableFuture对象需要的SpringBoot版本为3.X.X以上,需要的kafka依赖版本为3.X.X以上,需要的jdk版本17以上。

1、阻塞式(等待式)获取生产者发送的消息

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void getResult(){//Integer partition, Long timestamp, K key, @Nullable V dataCompletableFuture<SendResult<String, String>> result =kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello-kafka");//怎么拿结果,通过ListenableFuture类拿结果try {//1、阻塞式等待拿结果SendResult<String, String> sendResult = result.get();if(null!=sendResult.getRecordMetadata()){//kafka服务器确认已经拿到了消息System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());}System.out.println("producerRecord:"+sendResult.getProducerRecord());} catch (Exception e) {e.printStackTrace();}}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid getResult(){eventProducer.getResult();}
}

测试结果:
在这里插入图片描述

消息发送成功:default-topic-0@1
2024-08-22 22:18:51.344  INFO 8976 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hello-group-1, groupId=hello-group] Adding newly assigned partitions: hello-topic-0
producerRecord:ProducerRecord(topic=default-topic, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=k3, value=hello-kafka, timestamp=1724336330821)

2、非阻塞式(非等待式)获取生产者发送的消息

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void getResult2(){//Integer partition, Long timestamp, K key, @Nullable V dataCompletableFuture<SendResult<String, String>> result =kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello-kafka");//怎么拿结果,通过CompletableFuture类拿结果try {//2、非阻塞式等待拿结果result.thenAccept((sendResult)->{if(null!=sendResult.getRecordMetadata()){//kafka服务器确认已经拿到了消息System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());}System.out.println("producerRecord:"+sendResult.getProducerRecord());}).exceptionally((e)->{e.printStackTrace();//做消息发送失败的处理System.out.println("消息发送失败");return null;});} catch (Exception e) {e.printStackTrace();}}}

测试类:

@Test
void getResult2(){eventProducer.getResult2();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/877766.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【html+css 绚丽Loading】 000014 三元波动盘

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享htmlcss 绚丽Loading&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495…

JVM系列--初始JVM

根据《黑马程序员JVM虚拟机入门到实战全套视频教程》整理 1 什么是JVM JVM 全称是 Java Virtual Machine&#xff0c;中文译名 Java虚拟机。JVM 本质上是一个运行在计算机上的程序&#xff0c;他的职责是运行Java字节码文件。 Java源代码执行流程如下&#xff1a; 分为三个步…

书生大模型实战营第三期基础岛第二课——8G 显存玩转书生大模型 Demo

8G 显存玩转书生大模型 Demo 基础任务进阶作业一&#xff1a;进阶作业二&#xff1a; 基础任务 使用 Cli Demo 完成 InternLM2-Chat-1.8B 模型的部署&#xff0c;并生成 300 字小故事&#xff0c;记录复现过程并截图。 创建conda环境 # 创建环境 conda create -n demo pytho…

[Meachines] [Easy] Legacy nmap 漏洞扫描脚本深度发现+MS08-067

信息收集 IP AddressOpening Ports10.10.10.4TCP:135,139,445 $ nmap -p- 10.10.10.4 --min-rate 1000 -sC -sV -Pn PORT STATE SERVICE VERSION 135/tcp open msrpc Microsoft Windows RPC 139/tcp open netbios-ssn Microsoft Windows n…

Java二十三种设计模式-责任链模式(17/23)

责任链模式&#xff1a;实现请求处理的灵活流转 引言 在这篇博客中&#xff0c;我们深入探讨了责任链模式的精髓&#xff0c;从其定义和用途到实现方法&#xff0c;再到使用场景、优缺点、与其他模式的比较&#xff0c;以及最佳实践和替代方案&#xff0c;旨在指导开发者如何…

SAP BW:QUERY数据结果写入ADSO

作者 idan lian 如需转载备注出处 如果对你有帮助&#xff0c;请点赞收藏~~~ 需求背景 客户基于QUERY进行报表展示&#xff0c;现需迁移到永洪报表平台&#xff0c;query中的变量参数&#xff0c;公式等无法直接生成视图&#xff0c;query相对复杂&#xff0c;不想直接在视图…

笔记mybatisplus

MP入门 Mybatis-Plus&#xff08;简称MP&#xff09;是一个Mybatis的增强工具&#xff0c;在Mybatis的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。 Mybatis-Plus已经封装好了大量增删改查的方法&#xff0c;程序员只需要继承BaseMapper就可以使用这些方法…

Linux阿里云服务器,利用docker安装EMQX

第一步&#xff0c;给云服务器docker进行加速 阿里云搜索“镜像加速器”&#xff0c;找到下面这个菜单&#xff0c;点进去 然后找到镜像工具下的镜像加速器 把这个加速器地址复制 然后在自己的云服务器中&#xff0c;找到docker的文件夹 点击json配置文件 把地址修改为刚刚…

如何将LaTeX数学公式嵌入到PowerPoint中

如何将LaTeX数学公式嵌入到PowerPoint中 简介 在学术演示或技术报告中&#xff0c;清晰且专业地展示数学公式是至关重要的。PowerPoint虽然提供了一些基本的公式编辑功能&#xff0c;但如果你需要更复杂或格式严格的公式&#xff0c;使用LaTeX生成公式并嵌入到PPT中是一个极佳…

Python酷库之旅-第三方库Pandas(092)

目录 一、用法精讲 391、pandas.Series.hist方法 391-1、语法 391-2、参数 391-3、功能 391-4、返回值 391-5、说明 391-6、用法 391-6-1、数据准备 391-6-2、代码示例 391-6-3、结果输出 392、pandas.Series.to_pickle方法 392-1、语法 392-2、参数 392-3、功能…

KT来袭,打造沉浸式体验的聚合性web3应用平台

随着步入 2024&#xff0c;漫长的区块链熊市即将接近尾声。纵观产业发展&#xff0c;逆流而上往往会是彰显品牌市场影响力和技术实力的最佳证明。在这次周期中&#xff0c;一个名为KT的web3.0聚合平台吸引了市场关注&#xff0c;无论在市场层面还是技术层面&#xff0c;都广泛赢…

听劝❗用AI做职场思维导图仅仅需要几秒钟啊

本文由 ChatMoney团队出品 嘿&#xff0c;各位职场朋友们 是不是常常对着密密麻麻的笔记感到焦虑呢&#xff1f; 想整理却无从下手&#xff1f; 别怕&#xff0c;ChatmoneyAI知识库来拯救你的整理困难症啦&#xff01; 咱们都知道&#xff0c;思维导图是职场中必备的神器 …

zoom 会议机器人web例子

一、需要创建zoom app&#xff0c;创建及配置参考&#xff1a;Zoom会议机器人转写例子-CSDN博客 这里直接使用zoom-recall的配置。 二、需要生成签名&#xff0c;参数为&#xff1a;zoom-recall中的Client ID和Client Secret 1、git clone https://github.com/zoom/meetings…

【PHP入门教程】PHPStudy环境搭建+composer创建项目

文章目录 PHP 的历史PHP 的用途PHP 的特点和优势PHP 环境搭建环境准备安装window 安装CentOS / Ubuntu / Debian 安装 第一个Hello World使用Apache服务运行命令行运行代码 Composer安装 Composer&#xff1a;安装途中报错解决&#xff1a;初始化项目创建文件最终文件目录Compo…

微服务:配置管理和配置热更新

参考&#xff1a;黑马程序员之微服务 &#x1f4a5; 该系列属于【SpringBoot基础】专栏&#xff0c;如您需查看其他SpringBoot相关文章&#xff0c;请您点击左边的连接 目录 一、引言 二、配置共享 1. 添加共享配置到nacos &#xff08;1&#xff09;jdbc的共享配置 shared…

设计模式之Decorator装饰者、Facade外观、Adapter适配器(Java)

装饰者模式 设计模式的基本原则&#xff0c;对内关闭修改。 Decorator Pattern&#xff0c;装饰者模式&#xff0c;也叫包装器模式(Wrapper Pattern)&#xff1a;将一个对象包装起来&#xff0c;增加新的行为和责任。一定是从外部传入&#xff0c;并且可以没有顺序&#xff0…

望繁信科技入选2024年第3批上海市高新技术成果转化项目名单

近日&#xff0c;上海望繁信科技有限公司&#xff08;以下简称“望繁信科技”&#xff09;凭借其自主研发的“数字北极星流程挖掘分析软件”项目&#xff0c;成功入选2024年第3批上海市高新技术成果转化项目名单。这一殊荣根据《上海市高新技术成果转化项目认定办法》&#xff…

Keil Error-Flash Download failed Cortex-M4 擦除芯片还不好使的方案!!!

点击魔术棒-Debug-Settings后看到SWDIO可以正常识别&#xff0c;但是点击Reset下拉只有三个选项。 此时点击Pack&#xff0c;将Enable勾去掉。 回到Reset&#xff0c;此时多了Autodetet选项&#xff0c;选择这个选项后&#xff0c;即可正常烧录。

CLI举例:通过ISAKMP方式建立GRE over IPsec隧道

配置安全策略&#xff0c;允许私网指定网段进行报文交互&#xff0c;放行IKE协商报文。配置静态路由&#xff0c;保证两端路由可达。配置GRE Tunnel接口以及Tunnel口的转发路由。配置基于ACL的IPsec策略。GRE over IPsec中IPsec需要保护的数据流以GRE的起点为源、终点为目的。 …

初始C++(类与对象)

感谢大佬的光临各位&#xff0c;希望和大家一起进步&#xff0c;望得到你的三连&#xff0c;互三支持&#xff0c;一起进步 个人主页&#xff1a;LaNzikinh-CSDN博客 文章目录 前言一.引用二.内联函数三.类和对象总结 前言 之前讲c的命令空间和第一个程序的运行&#xff0c;继…