SpringBoot Kafka发送消息与接收消息实例

前言 

Kafka的基本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。 

 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>SpringBootKafka</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootKafka</name><description>SpringBootKafka</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- pom.xml --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency></dependencies><repositories><repository><id>central</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public/</url><layout>default</layout><!-- 是否开启发布版构件下载 --><releases><enabled>true</enabled></releases><!-- 是否开启快照版构件下载 --><snapshots><enabled>false</enabled></snapshots></repository></repositories><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 

spring:kafka:bootstrap-servers: 192.168.110.105:9092#streams:#application-id: my-streams-appconsumer:group-id: myGroupIdauto-offset-reset: latestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 5

3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果

package com.example.springbootkafka.service;import com.example.springbootkafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;@Slf4j
@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;private final ObjectMapper objectMapper;@Autowiredpublic KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {this.kafkaTemplate = kafkaTemplate;this.objectMapper = objectMapper;}public void sendMessage(String message) {log.info("KafkaProducer message:{}", message);//kafkaTemplate.send("test", message).addCallback();Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});// 使用whenComplete方法completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});/*future.whenComplete((result, ex) -> {if (ex == null) {// 成功发送RecordMetadata metadata = result.getRecordMetadata();System.out.println("Message sent successfully with offset: " + metadata.offset());} else {// 发送失败System.err.println("Failed to send message due to: " + ex.getMessage());}});*/}public void sendUser(User user) throws JsonProcessingException {//final ProducerRecord<String, String> record = createRecord(data);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);String userJson = objectMapper.writeValueAsString(user);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);/*future.addCallback(success -> System.out.println("Message sent successfully: " + userJson),failure -> System.err.println("Failed to send message: " + failure.getMessage()));*/CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});}
}

 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息

package com.example.springbootkafka.service;import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class KafkaConsumer {@KafkaListener(topics = "test", groupId = "myGroupId")public void consume(String message) {System.out.println("Received message: " + message);log.info("KafkaConsumer message:{}", message);}
}

5.测试消息的生成与发送

package com.example.springbootkafka.controller;import com.example.springbootkafka.entity.User;
import com.example.springbootkafka.service.KafkaProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class MessageController {private final KafkaProducer producer;@Autowiredpublic MessageController(KafkaProducer producer) {this.producer = producer;}@GetMapping("/send-message")public String sendMessage() {log.info("MessageController sendMessage start!");producer.sendMessage("hello, Kafka!");log.info("MessageController sendMessage end!");return "Message sent successfully.";}@GetMapping("/send")public String sendMessage1() {log.info("MessageController sendMessage1 start!");User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();try {producer.sendUser(user);} catch (JsonProcessingException e) {throw new RuntimeException(e);}log.info("MessageController sendMessage1 end!");return "Message sendMessage1 successfully.";}
}

 6.查看结果:

 

详细代码见https://gitee.com/dylan_2017/springboot-kafka.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/879801.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT打开摄像头采集

QT打开摄像头采集 今天好不容易把opencv的环境装好&#xff0c;然后想学习一下人脸识别的功能&#xff0c;但是在图书馆坐了4个多小时了&#xff0c;屁股疼就先写个摄像头采集的功能&#xff0c;明天继续学习吧&#xff0c;废话不多&#xff0c;嚼个奶片开始发车&#xff01;&…

滚雪球学SpringCloud[5.3讲]: 配置管理中的高可用与容错

全文目录&#xff1a; 前言高可用配置中心的搭建为什么需要高可用配置中心&#xff1f;多实例与负载均衡数据一致性实战示例&#xff1a;使用Nginx实现高可用配置中心 Spring Cloud Config中的高可用性高可用性的进一步优化 配置管理中的故障处理策略分布式系统中的常见故障故障…

JVM java主流的追踪式垃圾收集器

目录 前言 分代垃圾收集理论 标记清除算法 标记复制算法 标记整理法 前言 从对象消亡的角度出发, 垃圾回收器可以分为引用计数式垃圾收集和追踪式垃圾收集两大类, 但是java主流的一般是追踪式的垃圾收集器, 因此我们重点讲解. 分代垃圾收集理论 分代收集这种理…

【vue3】vue3.3新特性真香

距离vue3.3发布已经过了一年多(2023.5.11),vue3.3提高开发体验的新特性你用了吗&#xff1f; 组件内部导入复杂类型 3.3之前想在组件内部导入复杂类型做props类型是不支持的。 <script setup lang"ts">import type { People } from /types;withDefaults(define…

python测试开发---js基础

JavaScript (JS) 是一种广泛用于前端开发的编程语言&#xff0c;其主要用于实现网页的动态交互功能。要掌握 JavaScript 的基础知识&#xff0c;主要需要理解以下几个核心概念&#xff1a; 1. 变量与数据类型 JavaScript 提供了不同的数据类型&#xff0c;并允许通过 var、le…

使用Refine构建项目(1)初始化项目

要初始化一个空的Refine项目&#xff0c;你可以使用Refine提供的CLI工具create-refine-app。以下是初始化步骤&#xff1a; 使用npx命令&#xff1a; 在命令行中运行以下命令来创建一个新的Refine项目&#xff1a; npx create-refine-applatest my-refine-project这将引导你通过…

Linux Vim编辑器常用命令

目录 一、命令模式快捷键 二、编辑/输入模式快捷键 三、编辑模式切换到命令模式 四、搜索命令 注&#xff1a;本章内容全部基于Centos7进行操作&#xff0c;查阅本章节内容前请确保您当前所在的Linux系统版本&#xff0c;且具有足够的权限执行操作。 一、命令模式快捷键 二…

企业专用智能云盘 | 帮助企业便捷管控企业文档 | 天锐绿盘云文档安全管理系统

由于当前多数企业内部的办公文件普遍散落于各员工电脑中&#xff0c;导致存在诸多潜在的文档使用风险。为优化团队协作效率&#xff0c;天 锐 绿盘是一款集文档统一管理、高效协同于一体的企业云盘&#xff0c;帮助企业解决文档管理中的诸多难题。 【地址&#xff1a;点击了解天…

STM32 MCU学习资源

STM32 MCU学习资源 文档下载需要注册登录账号 ST公司官方文档 STM32 MCU开发者资源 STM32F446 相关PDF文档 ST中文论坛 中文译文资料 ST MCU中文官网 其他学习资源 野火STM32库开发实战指南 零基础快速上手STM32开发&#xff08;手把手保姆级教程&#xff09; 直接使…

如何切换npm到淘宝的最新镜像源?

目录 前言一、查看当前npm镜像源二、切换到淘宝镜像源三、验证是否成功切换四、其他注意事项总结前言 要切换npm到淘宝的最新镜像源,您可以按照以下步骤操作: 一、查看当前npm镜像源 在更改npm镜像源之前,首先需要查看当前npm正在使用的镜像源地址。您可以通过在命令行(…

【2023工业异常检测文献】SimpleNet

SimpleNet:ASimpleNetworkforImageAnomalyDetectionandLocalization 1、Background 图像异常检测和定位主要任务是识别并定位图像中异常区域。 工业异常检测最大的难题在于异常样本少&#xff0c;一般采用无监督方法&#xff0c;在训练过程中只使用正常样本。 解决工业异常检…

TCP客户端编码和解码处理:发送和接收指定编码消息

文章目录 引言基于Netty实现TCP客户端Netty发送GBK编码指令Netty接收GBK编码基于Channel发送指令基于ChannelHandlerContext发送指令:建立连接时发送登陆指令开启日志,查看报文信息基于ChannelInboundHandlerAdapter进行业务逻辑处理原生API实现TCP客户端基于DataOutputStrea…

AI预测福彩3D采取888=3策略+和值012路或胆码测试9月19日新模型预测第92弹

经过90多期的测试&#xff0c;当然有很多彩友也一直在观察我每天发的预测结果&#xff0c;得到了一个非常有价值的信息&#xff0c;那就是9码定位的命中率非常高&#xff0c;90多期一共只错了10次&#xff0c;这给喜欢打私房菜的朋友提供了极高价值的预测结果~当然了&#xff0…

C语言中的assert断言

Assert断言 断言是程序中处理异常的一种高级形式。可以在任何时候启用和禁用断言验证&#xff0c;因此可以在测试时启用断言&#xff0c;而在部署时禁用断言。同样&#xff0c;程序投入运行后&#xff0c;最终用户在遇到问题时可以重新启用断言。 用法&#xff1a; #…

23.面试题02.07链表相交

public class Solution {public ListNode getIntersectionNode(ListNode headA, ListNode headB) {ListNode apheadA;ListNode bpheadB;int lenA0,lenB0;//求两个链表长度while(ap!null){apap.next;lenA;}while(bp!null){bpbp.next;lenB;}apheadA;bpheadB;int len0;//用来计算让…

Msf之Python分离免杀

Msf之Python分离免杀 ——XyLin. 成果展示&#xff1a; VT查杀率:8/73 (virustotal.com) 火绒和360可以过掉&#xff0c;但Windows Defender点开就寄掉了 提示&#xff1a;我用360测的时候&#xff0c;免杀过了&#xff0c;但360同时也申报了&#xff0c;估计要不了多久就寄…

CentOS5.2中安装并设置TFTP服务

在CentOS5.2中安装并设置TFTP服务,下面安装TFTP服务必须能联网,因为是使用yum安装,如果使用rpm从光盘安装则不需要。下面是步骤: 安装TFTP服务器: yum install tftp-server 安装TFTP客户端: yum install tftp 执行上面的命令后会自动搜索相应的最新版软件并自动下…

【C++ Primer Plus习题】6.9

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: #include <iostream> #include <string> #include <…

2-99 基于matlab多尺度形态学提取眼前节组织

基于matlab多尺度形态学提取眼前节组织&#xff0c;通过应用不同尺度的结构元素进行边缘检测&#xff0c;再通过加权融合的思想来整合检测到的边缘&#xff0c;降低图像噪声的影响&#xff0c;提高边缘检测的精度。程序已调通&#xff0c;可直接运行。 下载源程序请点链接&…

SalescustomerController

目录 1、 SalescustomerController 1.1、 /// 查询 1.1.1、 CustomerCode tbSales.CustomerCode,//客户编号 1.1.2、 //客户名称 1.2、 /// 修改 using QXQPS.Models; using QXQPS.Vo; using System; using System.Collections.Generic; using System.Linq; us…