SpringBoot 集成 Kafka消息中间件,Docker安装Kafka环境

前述

提供kafka、zooker在docker环境下进行安装的示例,springBoot集成kafka实现producer-生产者和consumer-消费者(监听消费:single模式和batch模式)的功能实现

环境安装


# 拉取镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka# 运行zooker
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
#运行kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --env KAFKA_LOG_DIRS=/kafka/KafkaLog --volume /home/vagrant/kafka/localtime:/etc/localtime --volume /home/vagrant/kafka/log:/app/kafka/log wurstmeister/kafka:latest

SpringBoot集成Kafka消息中间件

pom依赖


<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><!-- kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency><!-- fastjson2 -->
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson2.version}</version>
</dependency><!--    hutool工具类    -->
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version>
</dependency>

配置

yaml文件配置

spring:kafka:# kafka地址bootstrap-servers: localhost:9092# 生产者配置producer:# 重试次数retries: 3#  批量提交batch-size: 16384# 缓存空间buffer-memory: 33554432# 消费者配置consumer:group-id: springboot-mq-kafka-demo# 手动提交enable-auto-commit: falseauto-offset-reset: latestproperties:# 超时时间session.timeout.ms: 60000# 监听listener:# 类型: single/batchtype: batchlog-container-config: false# 分区concurrency: 5# 手动提交ack-mode: MANUAL_IMMEDIATE
Config
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;import java.util.Objects;/*** kafka配置类** @author yunnuo* @since 1.0.0*/
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {private final KafkaProperties kafkaProperties;private static final Integer DEFAULT_PARTITION_NUM = 3;private static final String GROUP_ID = "springboot-mq-kafka-demo-batch-manual_immediate";@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());}/*** kafka 默认 ContainerFactory** @return {@link ConcurrentKafkaListenerContainerFactory}*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 提交模式ContainerProperties.AckMode ackMode = Objects.nonNull(kafkaProperties.getListener().getAckMode()) ? kafkaProperties.getListener().getAckMode() : ContainerProperties.AckMode.MANUAL_IMMEDIATE;factory.getContainerProperties().setAckMode(ackMode);// 分区Integer concurrency = kafkaProperties.getListener().getConcurrency();concurrency = (Objects.nonNull(concurrency) && concurrency > 0) ? concurrency : DEFAULT_PARTITION_NUM;factory.setConcurrency(concurrency);// 拉取类型:单个/批量KafkaProperties.Listener.Type type = kafkaProperties.getListener().getType();factory.setBatchListener(Objects.equals(type, KafkaProperties.Listener.Type.BATCH));return factory;}/*** 自定义 ContainerFactory** @return {@link ConcurrentKafkaListenerContainerFactory}*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> batchContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setGroupId(GROUP_ID);factory.setConcurrency(DEFAULT_PARTITION_NUM);factory.setBatchListener(true);factory.getContainerProperties().setPollTimeout(3000);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

生产者(producer)

下面示例是采用API方式进行调用发送kafka消息,进行模拟生产者

请求DTO

import lombok.Data;/*** kafka 发送 请求** @author yunnuo <a href="2552846359@qq.com">Email: 2552846359@qq.com</a>* @date 2023-12-27*/
@Data
public class KafkaPushDemoReq {private String topic;private String msg;}
API接口发送kafka

import com.ukayunnuo.core.Result;
import com.ukayunnuo.core.exception.ServiceException;
import com.ukayunnuo.domain.request.KafkaPushDemoReq;
import org.springframework.http.HttpStatus;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;/*** kafka 测试 api 接口** @author yunnuo* @since 1.0.0*/
@RestController
@RequestMapping("/demo/kafka")
public class KafkaPushController {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** kafka 发送消息 (无结果)** @param req 请求参数* @return 结果*/@PostMapping("pushTest")public Result<Boolean> pushMsgTest(@RequestBody KafkaPushDemoReq req) {kafkaTemplate.send(req.getTopic(), req.getMsg());return Result.success(Boolean.TRUE);}}

消费者(consumer)


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;/*** kafka 消费** @author yunnuo <a href="2552846359@qq.com">Email: 2552846359@qq.com</a>* @date 2023-12-27*/
@Slf4j
@Component
public class KafkaConsumerTest {/*** 使用默认配置的 ContainerFactory 进行监听 (single 模式)** @param record        消息* @param acknowledgment 提交器*/@KafkaListener(topics = {"default_container_factory-test.kafka.demo.default.single"})public void receiveMessageDefaultSingle(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {log.info("receiveMessageDefault-single record key:{}, value:{}", record.key(), record.value());acknowledgment.acknowledge();}/*** 使用默认配置的 ContainerFactory 进行监听 (batch 模式)** @param records        消息* @param acknowledgment 提交器*/@KafkaListener(topics = {"default_container_factory-test.kafka.demo.default.batch"})public void receiveMessageDefaultBatch(ConsumerRecords<String, String> records, Acknowledgment acknowledgment) {log.info("receiveMessageDefault-batch records size:{}", records.count());for (ConsumerRecord<String, String> record : records) {log.info("receiveMessageDefault-batch record key:{}, value:{}", record.key(), record.value());}acknowledgment.acknowledge();}/*** 使用自定义的 batchContainerFactory 进行监听** @param records        消息* @param acknowledgment 提交器*/@KafkaListener(topics = {"batch_container_factory-test.kafka.demo.batch"}, containerFactory = "batchContainerFactory")public void receiveMessage(ConsumerRecords<String, String> records, Acknowledgment acknowledgment) {log.info("receiveMessage records size:{}", records.count());for (ConsumerRecord<String, String> record : records) {log.info("receiveMessage record key:{}, value:{}", record.key(), record.value());}acknowledgment.acknowledge();}}

IDEA-HTTP请求示例

### 消息发送-single模式
POST http://localhost:8087/demo/kafka/pushTest
Content-Type: application/json{"topic": "default_container_factory-test.kafka.demo.default.single","msg": "test single send..."
}### 消息发送-batch模式
POST http://localhost:8087/demo/kafka/pushTest
Content-Type: application/json{"topic": "default_container_factory-test.kafka.demo.default.batch","msg": "test batch send..."
}### 消息发送-自定义
POST http://localhost:8087/demo/kafka/pushTest
Content-Type: application/json{"topic": "batch_container_factory-test.kafka.demo.batch","msg": "test custom batch send..."
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/583073.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NXP实战笔记(三):S32K3xx基于RTD-SDK在S32DS上配置WDT配置

目录 1、WDT概述 2、SWT配置 2.1、超时时间&#xff0c;复位方式的配置 2.2、中断形式 1、WDT概述 SWT 编程模型只允许 32 位&#xff08;字&#xff09;访问。 以下任何尝试访问都是无效的: •非32位访问 •写入只读寄存器 •启用SWT时&#xff0c;将不正确的值写入SR…

uniapp:实现手机端APP登录强制更新,从本地服务器下载新的apk更新,并使用WebSocket,实时强制在线用户更新

实现登录即更新&#xff0c;或实时监听更新 本文介绍的是在App打开启动的时候调用更新&#xff0c;点击下方链接&#xff0c;查看使用WebSocket实现实时通知在线用户更新。 uniapp&#xff1a;全局消息是推送&#xff0c;实现app在线更新&#xff0c;WebSocket&#xff0c;ap…

java 纯代码导出pdf合并单元格

java 纯代码导出pdf合并单元格 接上篇博客 java导出pdf&#xff08;纯代码实现&#xff09; 后有一部分猿友叫我提供一下源码&#xff0c;实际上我的源码已经贴在帖子上了&#xff0c;都是同样的步骤&#xff0c;只是加多一点设置就可以了。今天我再次上传一下相对情况比较完整…

开源预约挂号平台 - 从0到上线

文章目录 开源预约挂号平台 - 从0到上线演示地址源码地址可以学到的技术前端技术后端技术部署上线开发工具其他技术业务功能 项目讲解前端创建项目 - 安装PNPM - 使用VSCODE - 安装插件首页顶部与底部 - 封装组建 - 使用scss左右布局中间内容部分路由 - vue-routerBANNER- 走马…

重装系统以后无法git跟踪

总结&#xff1a;权限问题 故障定位 解决方案&#xff1a; 复制一份新的文件夹。&#xff08;新建的文件创建和写入权限都变了&#xff09; 修改文件为新的用户 执行提示的命令

SuperMap iServer发布的ArcGIS REST 地图服务如何通过ArcGIS API进行要素查询

作者&#xff1a;yx 前言 前面我们介绍了SuperMap iServer发布的ArcGIS REST 地图服务如何通过ArcGIS API加载&#xff0c;这里呢我们再来看看如何进行要素查询呢&#xff1f; 一、服务发布 SuperMap iServer发布的ArcGIS REST 地图服务如何通过ArcGIS API加载已经介绍如何发…

55. 跳跃游戏

给你一个非负整数数组 nums &#xff0c;你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标&#xff0c;如果可以&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输…

日志框架简介-Slf4j+Logback入门实践 | 京东云技术团队

前言 随着互联网和大数据的迅猛发展&#xff0c;分布式日志系统和日志分析系统已广泛应用&#xff0c;几乎所有应用程序都使用各种日志框架记录程序运行信息。因此&#xff0c;作为工程师&#xff0c;了解主流的日志记录框架非常重要。虽然应用程序的运行结果不受日志的有无影…

OpenEular23.09(欧拉)操作系统为企业搭建独立的K8S集群环境,详细流程+截图

1.环境&#xff1b; win10&#xff0c;vmware16 pro&#xff0c;openeular23.09 集群模式&#xff1a;一主二从 主机硬件配置 主机名IP角色CPU内存硬盘k8s-master01192.168.91.100master4C4G40Gk8s-worker02192.168.91.101worker(node)4C4G40Gk8s-worker03192.168.91.102work…

12月27日,每日信息差

以下是2023年12月27日的8条信息差 第一、小米公司&#xff1a;小米汽车正式加入小米“人车家全生态”&#xff0c;随着小米汽车的即将发布&#xff0c;小米“人车家全生态”也实现了真正闭环 第二、吉利将于2024年初发射11颗卫星&#xff0c;吉利银河E8率先搭载卫星通信技术。…

详解全志R128 RTOS安全方案功能

介绍 R128 下安全方案的功能。安全完整的方案基于标准方案扩展&#xff0c;覆盖硬件安全、硬件加解密引擎、安全启动、安全系统、安全存储等方面。 配置文件相关 本文涉及到一些配置文件&#xff0c;在此进行说明。 env*.cfg配置文件路径&#xff1a; board/<chip>/&…

字符串转成时间的SQL,一个多种数据库通用的函数

select date 2010-10-06 from dual; date 函数&#xff0c;此函数适用于&#xff1a; 1.MySQL数据库 2.Oracle数据库 3.达梦数据库 4.人大金仓数据库

TensorFlow 的基本概念和使用场景。

TensorFlow是一个开源的机器学习框架&#xff0c;由Google开发并于2015年发布。它旨在提供一个灵活的、高效的框架来构建和部署各种机器学习和深度学习模型。 TensorFlow的核心概念是使用数据流图来表示计算任务。数据流图是由节点&#xff08;操作&#xff09;和边&#xff0…

linux用户态与内核态通过字符设备交互

linux用户态与内核态通过字符设备交互 简述 Linux设备分为三类&#xff0c;字符设备、块设备、网络接口设备。字符设备只能一个字节一个字节读取&#xff0c;常见外设基本都是字符设备。块设备一般用于存储设备&#xff0c;一块一块的读取。网络设备&#xff0c;Linux将对网络…

JAVA进化史: JDK7特性及说明

JDK 7&#xff08;Java Development Kit 7&#xff09;是Java平台的一个重要版本&#xff0c;于2011年7月发布。这个版本引入了一系列的语言、库和虚拟机的改进&#xff0c;提升了Java的开发体验和性能。以下是JDK 7的一些主要特性&#xff0c;以及带有示例说明 字符串在switc…

数的分解(100%用例)C卷 (JavaPythonNode.jsC++)

给定一个正整数n,如果能够分解为m(m >1)个连续正整数之和,请输出所有分解中,m最小的分解。 如果给定整数无法分解为连续正整数,则输出字符串"N" 输入描述 输入数据为一整数,范围为 (1,2^30] 输出描述 比如输入为: 21 输出: 21=10+11 示例1 输入输出示例…

20231228在Firefly的AIO-3399J开发板的Android11使用Firefly的DTS配置单前后摄像头ov13850

20231228在Firefly的AIO-3399J开发板的Android11使用Firefly的DTS配置单前后摄像头ov13850 2023/12/28 19:20 缘起&#xff0c;突然发现只能打开前置的ov13850&#xff0c;或者后置的ov13850。 但是不能切换&#xff01; 【SDK&#xff1a;rk3399-android-11-r20211216.tar.xz】…

c++学习:运算符重载编写字符串类实战

目录 先定义一个类 定义构造和析构函数 用out<< 用s1.clear();清空数组 用s1.size();返回字符个数 加入扩容数组函数 用s2.append("world");和s1 "nihao";追加数组数据 用if(s1 s2)比较两个对象的数组 用if(s1 "123456")比较对…

Windows搭建RTSP视频流服务(EasyDarWin服务器版)

文章目录 引言1、安装FFmpeg2、安装EasyDarWin3、实现本地\虚拟摄像头推流服务4、使用VLC或PotPlayer可视化播放器播放视频5、RTSP / RTMP系列文章 引言 RTSP和RTMP视频流的区别 RTSP &#xff08;Real-Time Streaming Protocol&#xff09;实时流媒体协议。 RTSP定义流格式&am…

[BUG] Hadoop-3.3.4集群yarn管理页面子队列不显示任务

1.问题描述 使用yarn调度任务时&#xff0c;在CapacityScheduler页面上单击叶队列&#xff08;或子队列&#xff09;时&#xff0c;不会显示应用程序任务信息&#xff0c;root队列可以显示任务。此外&#xff0c;FairScheduler页面是正常的。 No matching records found2.原…