SpringBoot集成kafka-生产者发送消息

springboot集成kafka发送消息

  • 1、kafkaTemplate.send()方法
    • 1.1、springboot集成kafka发送消息Message对象消息
    • 1.2、springboot集成kafka发送ProducerRecord对象消息
    • 1.3、springboot集成kafka发送指定分区消息
  • 2、kafkaTemplate.sendDefault()方法
  • 3、kafkaTemplate.send(...)和kafkaTemplate.sendDefault(...)的区别
  • 4、发送对象消息

在这里插入图片描述在这里插入图片描述

1、kafkaTemplate.send()方法

1.1、springboot集成kafka发送消息Message对象消息

生产者代码

package com.power.producer;import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}
}

2、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testMessage(){eventProducer.sendMessage();}@Testvoid testMessage(){eventProducer.sendMessage();}
}

1.2、springboot集成kafka发送ProducerRecord对象消息

生产者

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}public void sendProducerRecord(){//Headers里面放一些信息(信息是key-value键值对),到时候消费组接收到消息后,可以拿到这个Headers里面放的信息Headers headers = new RecordHeaders();headers.add("phone","17676767676".getBytes(StandardCharsets.UTF_8));headers.add("orderId","OD1456467576467".getBytes(StandardCharsets.UTF_8));//String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headersProducerRecord<String,String> record =new ProducerRecord("test-topic-02", 0, System.currentTimeMillis(), "key1", "value", headers);kafkaTemplate.send(record);}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testProducerRecord(){eventProducer.sendProducerRecord();}}

1.3、springboot集成kafka发送指定分区消息

生产者:(方法sendEvent4)

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent4(){//String topic, Integer partition, Long timestamp, K key, V datakafkaTemplate.send("test-topic-02",0,System.currentTimeMillis(),"k2","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent4(){eventProducer.sendEvent4();}
}

2、kafkaTemplate.sendDefault()方法

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendDefault(){//Integer partition, Long timestamp, K key, V datakafkaTemplate.sendDefault(0,System.currentTimeMillis(),"k3","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendDefault(){eventProducer.sendDefault();}
}

配置文件:设置默认topic
不设置topic运行生产者会报找不到topic的错。
在这里插入图片描述

3、kafkaTemplate.send(…)和kafkaTemplate.sendDefault(…)的区别

在这里插入图片描述

4、发送对象消息

在这里插入图片描述

生产者:

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void getResult3(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.sendDefault(null,System.currentTimeMillis(),"k3",user);}}

实体类:

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid getResult2(){eventProducer.getResult3();}
}

配置文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/878131.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WIN/MAC 图像处理软件Adobe Photoshop PS2024软件下载安装

目录 一、软件概述 1.1 基本信息 1.2 主要功能 二、系统要求 2.1 Windows 系统要求 2.2 macOS 系统要求 三、下载 四、使用教程 4.1 基本界面介绍 4.2 常用工具使用 4.3 进阶操作 一、软件概述 1.1 基本信息 Adobe Photoshop&#xff08;简称PS&#xff09;是一款…

springboot嵌入式数据库实践-H2内嵌数据库(文件、内存)

本文章记录笔者的嵌入式数据库简单实现&#xff0c; 记录简要的配置过程。自用文章&#xff0c;仅作参考。 目录 本文章记录笔者的嵌入式数据库简单实现&#xff0c; 记录简要的配置过程。自用文章&#xff0c;仅作参考。 嵌入式数据库 -------------------------------具…

前端手写源码系列(三)——手写_deepClone深浅拷贝

目录 一、基本类型和引用类型二、深浅拷贝概念三、浅拷贝实现方式1、Object.assign()2、Array.prototype.concat() 修改新对象会改到原对象3、Array.prototype.slice() 四、深拷贝实现方式1、JSON.parse(JSON.stringify())2、手写递归方法3、函数库lodash 五、手写深拷贝 一、基…

Linux系统(centos7)增加一个开机自启任务

任务背景 已经上线了一个java的springboot项目&#xff0c;使用start.sh脚本进行启动&#xff0c;脚本内容为&#xff1a; #!/bin/bashnohup java -jar /opt/javaProject/PracticeSpring-0.0.1-SNAPSHOT.jar > /opt/javaProject/run.log 2>&1 & 现在&#xff…

16岁激活交学费银行卡需要本人实名电话卡,线下营业厅不给办,怎么办?

16岁激活交学费银行卡需要本人实名电话卡&#xff0c;线下营业厅不给办&#xff0c;怎么办&#xff1f; 话卡办理规定&#xff1a; 根据《民法典》和《电话用户真实身份信息登记规定》的相关要求&#xff0c;未满16周岁的用户通常需要在监护人的陪同下办理电话卡&#xff0c;并…

uniapp微信小程序 分享功能

uniapp https://zh.uniapp.dcloud.io/api/plugins/share.html#onshareappmessage export default {onShareAppMessage(res) {if (res.from button) {// 来自页面内分享按钮console.log(res.target)}return {title: 自定义分享标题,path: /pages/test/test?id123}} }需要再真机…

IntelliJ IDEA智能代码补全​和集成AI助手说明及操作

IntelliJ IDEA的智能代码补全和集成AI助手是开发者提高编码效率和代码质量的重要工具。以下是对这些功能的详细说明及操作指南&#xff1a; 一、智能代码补全 1. 功能说明 IntelliJ IDEA的智能代码补全功能利用先进的算法和上下文分析&#xff0c;为开发者提供准确、快速的代…

程序猿必备技能-Bat脚本

Batch 脚本&#xff08;批处理脚本&#xff09;是在 Windows 操作系统中使用的一种脚本语言&#xff0c;用于自动化执行一系列命令。Batch 脚本是由 .bat 或 .cmd 文件扩展名标识的文本文件&#xff0c;这些文件可以被 Windows 的命令行解释器&#xff08;如 cmd.exe&#xff0…

衡石科技BI的API如何授权文档解析

授权说明​ 授权模式​ 使用凭证式&#xff08;client credentials&#xff09;授权模式。 授权模式流程说明​ 第一步&#xff0c;A 应用在命令行向 B 发出请求。 第二步&#xff0c;B 网站验证通过以后&#xff0c;直接返回令牌。 授权模式结构说明​ 接口说明​ 获取a…

shell之getopts

getopts 是一个常用于解析命令行选项的bash内建命令。它的基本语法是&#xff1a; getopts optstring name [arg...]optstring列出了对应的Shell Script可以识别的所有参数。比如&#xff1a; 如果 Shell Script可以识别-a&#xff0c;-f以及-s参数&#xff0c;则optstring就是…

【贪心 决策包容性 】757. 设置交集大小至少为2

本文涉及知识点 贪心 决策包容性 LeetCode757. 设置交集大小至少为2 给你一个二维整数数组 intervals &#xff0c;其中 intervals[i] [starti, endi] 表示从 starti 到 endi 的所有整数&#xff0c;包括 starti 和 endi 。 包含集合 是一个名为 nums 的数组&#xff0c;并…

Quasar V2.16.4 新版发布,基于 Vue 3 的前端开发框架,一套代码发布到多端

Quasar 又发布新版本了&#xff0c;性能优秀的 Vue 组件开发框架&#xff0c;时隔3年再次推荐给大家。 早在2021年&#xff0c;我就写了一篇简单的文章向大家推荐了 Quasar 这款 Vue.js 开发框架&#xff0c;如今3年过去了&#xff0c;Quasar 发展得很好&#xff0c;更新频率依…

H5开发有哪些技巧?

随着现代社会的飞速发展&#xff0c;网页开发已经从传统的HTML、CSS、JavaScript往H5发展。H5也称为HTML5&#xff0c;可以理解为是HTML的升级版&#xff0c;具有更加优秀的性能、更加完善的功能和更加多样的体验。因其灵活性和跨平台特性&#xff0c;成为了各类移动应用和网页…

数学建模学习(128):使用Python结合CILOS与熵法的多准则决策权重确定

本文介绍方法为:结合CILOS与熵法的多准则决策权重,请理解为主,代码可以当作模板使用。 文章目录 1 引言2 问题背景2.1. 熵法 (Entropy Method)2.2 准则影响损失法 (CILOS Method)2.3 Python代码实现2.4 结果的决策指导意义2.4 结论参考文献1 引言 多准则决策(Multi-Criter…

面试常问! transformer中dk的大小,以及为什么设成这样,维度,原文分析。

目录&#xff1a; 原文 &#xff1a;翻译&#xff1a;流程&#xff1a;原因&#xff1a; 原文(多头注意力部分) &#xff1a; 李沐b站论文精读 论文网盘下载&#xff1a;链接 提取码: vm3d 翻译&#xff1a; 在这项工作中&#xff0c;我们采用了 h8 个并行注意力层&#xff…

flutter之image_picker上传图片

原文地址 image_picker 安装 image_picker: ^1.1.2使用 我们获取到上传的照片后,将其转为base64编码的格式,方便后续使用 // source: 接收两种模式,相册和拍照final pickedImage =await

No module named ‘comtypes‘

No module named comtypes 报错解释&#xff1a; 这个错误表明Python解释器无法找到名为comtypes的模块。comtypes是一个Python库&#xff0c;用于处理COM类型库&#xff0c;特别是用于与Windows COM组件交互。如果你的代码或者你尝试运行的程序需要使用comtypes库&#xff0…

Linux远程管理—SSH协议

SSH协议是远程连接的安全性协议&#xff0c;该协议可以有效防止远程管理过程中的信息泄漏&#xff0c;是西安传输数据加密&#xff0c;能够防止DNS和IP欺骗&#xff0c;传输数据压缩&#xff0c;加快传输速度。 安全验证方法有口令验证和密钥验证两种实现手段&#xff0c;该协…

线上剧本杀小程序,线上游戏新体验

剧本杀作为当下热门的社交型游戏方式&#xff0c;成为了大众社交娱乐的选择&#xff0c;为大众带来新的游戏体验。在数字化发展时期&#xff0c;线上剧本杀为大众带来了新鲜的游戏体验&#xff0c;它打破了时间空间限制&#xff0c;让玩家在手机上体验虚拟游戏&#xff0c;通过…

Python的可选导入 optional import

Python的可选导入&#xff1a;提升代码可读性与简洁性的利器 在Python编程中&#xff0c;我们常常会使用到各种库和模块。为了方便使用这些库和模块&#xff0c;我们需要通过导入语句将其引入到我们的程序中。然而&#xff0c;随着项目规模的增大&#xff0c;导入语句可能会变…