手拉手springboot整合kafka发送消息

环境介绍
技术栈springboot+mybatis-plus+mysql+rocketmq
软件版本
mysql8
IDEAIntelliJ IDEA 2022.2.1
JDK17
Spring Boot3.1.7
kafka2.13-3.7.0

创建topic时,若不指定topic的分区(Partition主题分区数)数量使,则默认为1个分区(partition)

springboot加入依赖kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

application.yml配置连接kafka

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

@Resource
private KafkaTemplate<String,String> kafkaTemplate;@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

@Component
public class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

@Component
public class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092
consumer:
auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

重置消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

Spring-kafka生产者发送消息

.send与sendDefault()方法都返回CompletableFuture<String<k,v>>;

CompletableFuture类用于异步编程,表示异步计算结果。该特征使得调用者不必等待操作完成就可以继续执行其他任务,从而提高引用的响应速度和吞吐量

@Resource
private KafkaTemplate<String,String> kafkaTemplate;@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

发送Message

@Test
void kafkaSendMessageTest1(){
//通过构建器模式创建Message
Message<String> message = MessageBuilder.withPayload("hello kafka send message")
.setHeader(KafkaHeaders.TOPIC,"kafkamsg01")
.build();
kafkaTemplate.send(message);
}

SendProducerRecord

String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers

@Test
void kafkaSendProducerRecordTest1() {
//参数 String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
Headers headers = new RecordHeaders();
headers.add("msg","123".getBytes(StandardCharsets.UTF_8));
ProducerRecord<String,String> record = new ProducerRecord(
"kafkaTopic01",
0,
System.currentTimeMillis(),
"key",
"hello kafka send message");
kafkaTemplate.send(record);
}

默认主题发送消息

yml配置默认主题

template:
default-topic: default-topic

@Test
void kafkaSendDefaultTest01(){
kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"key01","hello ");
}

发送Object消息

序列化默认为String

@Resource
private KafkaTemplate<String,Object> kafkaTemplate1;
@Test
void kafkaSendObject(){
MessageM messageM =MessageM.builder().userID(123).sn("xo1111").desc("测试").build();
//分区是null,kafka自行决定消息发送到哪个分区
kafkaTemplate1.sendDefault(null,System.currentTimeMillis(),"key01",messageM);
}

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/19399.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索无限可能性——微软 Visio 2021 改变您的思维方式

在当今信息化时代&#xff0c;信息流动和数据处理已经成为各行各业的关键。微软 Visio 2021 作为领先的流程图和图表软件&#xff0c;帮助用户以直观、动态的方式呈现信息和数据&#xff0c;从而提高工作效率&#xff0c;优化业务流程。本文将介绍 Visio 2021 的特色功能及其在…

华为OD机试 - 游戏分组 - 递归(Java 2024 C卷 100分)

华为OD机试 2024C卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答题思路、详细的代码注释、样例测试…

精准检测,安全无忧:安全阀检测实践指南

安全阀作为一种重要的安全装置&#xff0c;在各类工业系统和设备中发挥着举足轻重的作用。 它通过自动控制内部压力&#xff0c;有效防止因压力过高而引发的设备损坏和事故风险&#xff0c;因此&#xff0c;对安全阀进行定期检测&#xff0c;确保其性能完好、工作可靠&#xf…

使用pytorch构建ResNet50模型训练猫狗数据集

数据集 1.导包 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from torchvision import datasets, transforms, models import numpy as np import matplotlib.pyplot as plt import os from tqdm.auto import t…

流媒体服务器SMS-语音对讲(一)

1.简介 在国标语音对讲对接中&#xff0c;会发现不同的厂商或不同型号的设备&#xff0c;对讲流程都不一样&#xff0c;本文主要介绍流媒体与设备之间的交互情况。 SMS流媒体服务代码库地址&#xff1a;https://gitee.com/inyeme/simple-media-server 2.流媒体与设备交互的可能…

max6675热电偶温度采集

思路来源 参考价格 概述 MAX6675具有冷端补偿和将来自K型热电偶的信号数字化。数据以12位分辨率输出&#xff0c;SPI™兼容&#xff0c; 只读格式。该转换器将温度分解为0.25C&#xff0c;允许读数高达1024C&#xff0c;并显示热电偶8LSB在0C至 700C 引脚连接 温度采样电路 …

中间件复习之-消息队列

消息队列在分布式架构的作用 消息队列&#xff1a;在消息的传输过程中保存消息的容器&#xff0c;生产者和消费者不直接通讯&#xff0c;依靠队列保证消息的可靠性&#xff0c;避免了系统间的相互影响。 主要作用&#xff1a; 业务解耦异步调用流量削峰 业务解耦 将模块间的…

MySQL之创建高性能的索引(八)

创建高性能的索引 覆盖索引 通常大家都会根据查询的WHERE条件来创建合适的索引&#xff0c;不过这只是索引优化的一个方面。设计优秀的索引应该考虑到整个查询&#xff0c;而不单单是WHERE条件部分。索引确实是一种查找数据的高效方式&#xff0c;但是MySQL也可以使用索引来直…

向量数据库引领 AI 创新——Zilliz 亮相 2024 亚马逊云科技中国峰会

2024年5月29日&#xff0c;亚马逊云科技中国峰会在上海召开&#xff0c;此次峰会聚集了来自全球各地的科技领袖、行业专家和创新企业&#xff0c;探讨云计算、大数据、人工智能等前沿技术的发展趋势和应用场景。作为领先的向量数据库技术公司&#xff0c;Zilliz 在本次峰会上展…

【漏洞复现】电信网关配置管理系统 rewrite.php 文件上传漏洞

0x01 产品简介 中国电信集团有限公司(英文名称"China Telecom”、简称“"中国电信”)成立于2000年9月&#xff0c;是中国特大型国有通信企业、上海世博会全球合作伙伴。电信网关配置管理系统是一个用于管理和配置电信网络中网关设备的软件系统。它可以帮助网络管理员…

在线IP检测如何做?代理IP需要检查什么?

当我们的数字足迹无处不在&#xff0c;隐私保护显得愈发重要。而代理IP就像是我们的隐身斗篷&#xff0c;让我们在各项网络业务中更加顺畅。 我们常常看到别人购买了代理IP服务后&#xff0c;通在线检测网站检查IP&#xff0c;相当于一个”售前检验““售后质检”的作用。但是…

2024-5-31 石群电路-19

2024-5-31&#xff0c;星期五&#xff0c;10:53&#xff0c;天气&#xff1a;阴雨&#xff0c;心情&#xff1a;晴。今天就要回学校啦&#xff0c;当大家看到这篇推文的时候我已经要收拾收拾去赶返校的火车啦&#xff0c;和女朋友短暂分别&#xff0c;不过小别胜新婚吗&#xf…

笔记-docker基于ubuntu22.04安装Jitsi Meet

背景 利用JitsiMeet打造一个可以在线会议的环境&#xff0c;根据躺的坑&#xff0c;做个记录 参考 JitsMeet部署安装说明 开始操作 环境 docker run -it --name ubuntu22.04 ubuntu:22.04 /bin/bash问题 1、安装 openjdk-11 apt install openjdk-11-jdk配置环境变量&…

自媒体必用的50 个最佳 ChatGPT 社交媒体帖子提示prompt通用模板教程

在这个信息爆炸的时代&#xff0c;社交媒体已经成为我们生活中不可或缺的一部分。无论是品牌宣传、个人展示&#xff0c;还是日常交流&#xff0c;我们都离不开它。然而&#xff0c;要在众多信息中脱颖而出&#xff0c;吸引大家的关注并不容易。这时候&#xff0c;ChatGPT这样的…

uniapp的tooltip功能放到表单laber

在uniapp中&#xff0c;tooltip功能通常是通过view组件的hover-class属性来实现的&#xff0c;而不是直接放在form的label上。hover-class属性可以定义当元素处于hover状态时的样式类&#xff0c;通过这个属性&#xff0c;可以实现一个类似tooltip的效果。 以下是一个简单的例…

跨境经营的艺术:中资企业海外市场售后服务创新与挑战

出海&#xff0c;已不再是企业的“备胎”&#xff0c;而是必须面对的“大考”&#xff01;在这个全球化的大潮中&#xff0c;有的企业乘风破浪&#xff0c;勇攀高峰&#xff0c;也有的企业在异国他乡遭遇了“水土不服”。 面对“要么出海&#xff0c;要么出局”的抉择&#xff…

一分钟学习数据安全——自主管理身份SSI基本概念

之前我们已经介绍过数字身份的几种模式。其中&#xff0c;分布式数字身份模式逐渐普及演进的结果就是自主管理身份&#xff08;SSI&#xff0c;Self-Sovereign Identity&#xff09;。当一个人能够完全拥有和控制其数字身份&#xff0c;而无需依赖中心化机构&#xff0c;这就是…

FreeRTOS实时系统 在任务中增加数组等相关操作 导致单片机起不来或者挂掉

在调试串口任务中增加如下代码&#xff0c;发现可以用keil进行仿真&#xff0c;但是烧录程序后&#xff0c;调试串口没有打印&#xff0c;状态灯也不闪烁&#xff0c;单片机完全起不来 博主就纳了闷了&#xff0c;究竟是什么原因&#xff0c;这段代码可是公司永流传的老代码了&…

香橙派OrangePi AIpro上手笔记——之USB摄像头目标检测方案测试(三)

整期笔记索引 香橙派OrangePi AIpro上手笔记——之USB摄像头目标检测方案测试&#xff08;一&#xff09; 香橙派OrangePi AIpro上手笔记——之USB摄像头目标检测方案测试&#xff08;二&#xff09; 香橙派OrangePi AIpro上手笔记——之USB摄像头目标检测方案测试&#xff08;…

【MySQL数据库】:MySQL复合查询

目录 基本查询回顾 多表查询 自连接 子查询 单行子查询 多行子查询 多列子查询 在from子句中使用子查询 合并查询 前面我们讲解的mysql表的查询都是对一张表进行查询&#xff0c;在实际开发中这远远不够。 基本查询回顾 【MySQL数据库】&#xff1a;MySQL基本查…