SpringBoot集成kafka-生产者发送消息

springboot集成kafka发送消息

  • 1、kafkaTemplate.send()方法
    • 1.1、springboot集成kafka发送消息Message对象消息
    • 1.2、springboot集成kafka发送ProducerRecord对象消息
    • 1.3、springboot集成kafka发送指定分区消息
  • 2、kafkaTemplate.sendDefault()方法
  • 3、kafkaTemplate.send(...)和kafkaTemplate.sendDefault(...)的区别
  • 4、发送对象消息

在这里插入图片描述在这里插入图片描述

1、kafkaTemplate.send()方法

1.1、springboot集成kafka发送消息Message对象消息

生产者代码

package com.power.producer;import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}
}

2、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testMessage(){eventProducer.sendMessage();}@Testvoid testMessage(){eventProducer.sendMessage();}
}

1.2、springboot集成kafka发送ProducerRecord对象消息

生产者

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}public void sendProducerRecord(){//Headers里面放一些信息(信息是key-value键值对),到时候消费组接收到消息后,可以拿到这个Headers里面放的信息Headers headers = new RecordHeaders();headers.add("phone","17676767676".getBytes(StandardCharsets.UTF_8));headers.add("orderId","OD1456467576467".getBytes(StandardCharsets.UTF_8));//String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headersProducerRecord<String,String> record =new ProducerRecord("test-topic-02", 0, System.currentTimeMillis(), "key1", "value", headers);kafkaTemplate.send(record);}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testProducerRecord(){eventProducer.sendProducerRecord();}}

1.3、springboot集成kafka发送指定分区消息

生产者:(方法sendEvent4)

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent4(){//String topic, Integer partition, Long timestamp, K key, V datakafkaTemplate.send("test-topic-02",0,System.currentTimeMillis(),"k2","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent4(){eventProducer.sendEvent4();}
}

2、kafkaTemplate.sendDefault()方法

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendDefault(){//Integer partition, Long timestamp, K key, V datakafkaTemplate.sendDefault(0,System.currentTimeMillis(),"k3","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendDefault(){eventProducer.sendDefault();}
}

配置文件:设置默认topic
不设置topic运行生产者会报找不到topic的错。
在这里插入图片描述

3、kafkaTemplate.send(…)和kafkaTemplate.sendDefault(…)的区别

在这里插入图片描述

4、发送对象消息

在这里插入图片描述

生产者:

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void getResult3(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.sendDefault(null,System.currentTimeMillis(),"k3",user);}}

实体类:

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid getResult2(){eventProducer.getResult3();}
}

配置文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/878131.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WIN/MAC 图像处理软件Adobe Photoshop PS2024软件下载安装

目录 一、软件概述 1.1 基本信息 1.2 主要功能 二、系统要求 2.1 Windows 系统要求 2.2 macOS 系统要求 三、下载 四、使用教程 4.1 基本界面介绍 4.2 常用工具使用 4.3 进阶操作 一、软件概述 1.1 基本信息 Adobe Photoshop&#xff08;简称PS&#xff09;是一款…

springboot嵌入式数据库实践-H2内嵌数据库(文件、内存)

本文章记录笔者的嵌入式数据库简单实现&#xff0c; 记录简要的配置过程。自用文章&#xff0c;仅作参考。 目录 本文章记录笔者的嵌入式数据库简单实现&#xff0c; 记录简要的配置过程。自用文章&#xff0c;仅作参考。 嵌入式数据库 -------------------------------具…

16岁激活交学费银行卡需要本人实名电话卡,线下营业厅不给办,怎么办?

16岁激活交学费银行卡需要本人实名电话卡&#xff0c;线下营业厅不给办&#xff0c;怎么办&#xff1f; 话卡办理规定&#xff1a; 根据《民法典》和《电话用户真实身份信息登记规定》的相关要求&#xff0c;未满16周岁的用户通常需要在监护人的陪同下办理电话卡&#xff0c;并…

uniapp微信小程序 分享功能

uniapp https://zh.uniapp.dcloud.io/api/plugins/share.html#onshareappmessage export default {onShareAppMessage(res) {if (res.from button) {// 来自页面内分享按钮console.log(res.target)}return {title: 自定义分享标题,path: /pages/test/test?id123}} }需要再真机…

衡石科技BI的API如何授权文档解析

授权说明​ 授权模式​ 使用凭证式&#xff08;client credentials&#xff09;授权模式。 授权模式流程说明​ 第一步&#xff0c;A 应用在命令行向 B 发出请求。 第二步&#xff0c;B 网站验证通过以后&#xff0c;直接返回令牌。 授权模式结构说明​ 接口说明​ 获取a…

【贪心 决策包容性 】757. 设置交集大小至少为2

本文涉及知识点 贪心 决策包容性 LeetCode757. 设置交集大小至少为2 给你一个二维整数数组 intervals &#xff0c;其中 intervals[i] [starti, endi] 表示从 starti 到 endi 的所有整数&#xff0c;包括 starti 和 endi 。 包含集合 是一个名为 nums 的数组&#xff0c;并…

Quasar V2.16.4 新版发布,基于 Vue 3 的前端开发框架,一套代码发布到多端

Quasar 又发布新版本了&#xff0c;性能优秀的 Vue 组件开发框架&#xff0c;时隔3年再次推荐给大家。 早在2021年&#xff0c;我就写了一篇简单的文章向大家推荐了 Quasar 这款 Vue.js 开发框架&#xff0c;如今3年过去了&#xff0c;Quasar 发展得很好&#xff0c;更新频率依…

H5开发有哪些技巧?

随着现代社会的飞速发展&#xff0c;网页开发已经从传统的HTML、CSS、JavaScript往H5发展。H5也称为HTML5&#xff0c;可以理解为是HTML的升级版&#xff0c;具有更加优秀的性能、更加完善的功能和更加多样的体验。因其灵活性和跨平台特性&#xff0c;成为了各类移动应用和网页…

面试常问! transformer中dk的大小,以及为什么设成这样,维度,原文分析。

目录&#xff1a; 原文 &#xff1a;翻译&#xff1a;流程&#xff1a;原因&#xff1a; 原文(多头注意力部分) &#xff1a; 李沐b站论文精读 论文网盘下载&#xff1a;链接 提取码: vm3d 翻译&#xff1a; 在这项工作中&#xff0c;我们采用了 h8 个并行注意力层&#xff…

Linux远程管理—SSH协议

SSH协议是远程连接的安全性协议&#xff0c;该协议可以有效防止远程管理过程中的信息泄漏&#xff0c;是西安传输数据加密&#xff0c;能够防止DNS和IP欺骗&#xff0c;传输数据压缩&#xff0c;加快传输速度。 安全验证方法有口令验证和密钥验证两种实现手段&#xff0c;该协…

线上剧本杀小程序,线上游戏新体验

剧本杀作为当下热门的社交型游戏方式&#xff0c;成为了大众社交娱乐的选择&#xff0c;为大众带来新的游戏体验。在数字化发展时期&#xff0c;线上剧本杀为大众带来了新鲜的游戏体验&#xff0c;它打破了时间空间限制&#xff0c;让玩家在手机上体验虚拟游戏&#xff0c;通过…

Oracle发邮件时SMTP服务器配置方法与步骤?

Oracle发邮件功能如何配置&#xff1f;如何优化Oracle发信性能&#xff1f; 为了实现自动化报告和通知&#xff0c;Oracle发邮件功能变得尤为重要。通过配置SMTP服务器&#xff0c;Oracle可以轻松地发送电子邮件。AokSend将详细介绍如何配置Oracle发邮件时的SMTP服务器&#x…

IDEA 编译项目时卡在Parsing java,最终报out of memory

今天在项目运行的时候莫名其妙报错&#xff1a; 报错内容&#xff1a; java.lang.OutOfMemoryError: GC overhead limit exceeded原因&#xff1a; 通常是因为堆内存太小&#xff0c;无法有效管理正在创建和销毁的对象。 解决方法&#xff1a; 在 IDEA 的安装目录下找到 bi…

ABB巨资收购一家电气龙头,为当年卖给日立电气业务回血

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 战略扩张&#xff1a;ABB携SEAM集团深耕电气服务市场 在电气服务领域&#xff0c;ABB再次展现了其强大的市场扩张能力。近日&#xff0c;ABB宣布…

【CanMV K230】外接传感器

【CanMV K230】外接传感器 外接LED灯 B站视频链接 抖音链接 我们后面主要做是机器视觉。K230能帮我们捕捉到图像信息。更多小功能需要我们自己来做。 比如舵机抬杆&#xff0c;测温报警等 都需要我们外接传感器。 本篇就来分享一下如何使用K230外接传感器 首先需要知道K230…

栈OJ题——有效的括号

文章目录 一、题目链接二、解题思路三、解题代码 一、题目链接 有效的括号 题目描述&#xff1a;给定一个只包括 ‘(’&#xff0c;‘)’&#xff0c;‘{’&#xff0c;‘}’&#xff0c;‘[’&#xff0c;‘]’ 的字符串 s &#xff0c;判断字符串是否有效。括号匹配。 二、…

Pycharm module ‘serial‘ has no attribute ‘Serial‘

已经pip install serial 已经提示安装成功了&#xff0c;提示没有Serial的属性&#xff0c;经过查询 &#xff0c;发现安装的模块不对&#xff0c;应该安装pyserial模块。 解决步骤 第一步&#xff1a;卸载serial pip uninstall serial 第二步&#xff1a;安装pyserial pip u…

【2025校招】4399 NLP算法工程师笔试题

目录 1. 第一题2. 第二题3. 第三题 ⏰ 时间&#xff1a;2024/08/19 &#x1f504; 输入输出&#xff1a;ACM格式 ⏳ 时长&#xff1a;2h 本试卷分为单选&#xff0c;自我评价题&#xff0c;编程题 单选和自我评价这里不再介绍&#xff0c;4399的编程题一如既往地抽象&#xff…

xss-labs通关攻略 11-15关

第十一关&#xff1a;less-11 步骤一&#xff1a;利用burp抓包 步骤二&#xff1a;添加referer:click me!" type"button" οnmοuseοver"alert(/xss/)进行放包 第十二关&#xff1a;less-12 步骤一&#xff1a;利用burp抓包 步骤二&#xff1a;修改User A…

【机器学习】独立成分分析的基本概念、应用领域、具体实例(含python代码)以及ICA和PCA的联系和区别

引言 独立成分分析&#xff08;Independent Component Analysis&#xff0c;简称ICA&#xff09;是一种统计方法&#xff0c;用于从多个观察到的混合信号中提取出原始的独立信号源 文章目录 引言一、独立成分分析1.1 定义1.2 独立成分分析的基本原理1.3 独立成分分析的步骤1.3.…