SpringBoot配置kafka

server:port: 8080
spring:kafka:bootstrap-servers: 192.168.79.104:9092producer: # 生产者retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500listener:# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATEredis:host: 192.168.79.104port: 6379password: 123321lettuce:pool:max-active: 10max-idle: 10min-idle: 1time-between-eviction-runs: 10s
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}

 

@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}}

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}
@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "default-group")public void consume(String message) {System.out.println("Received message: " + message);}}

 在上面的代码中,我们使用 @KafkaListener 注解声明了一个消费者方法,用于接收从 my-topic 主题中读取的消息。在这里,我们将消费者组 ID 设置为default-group。
现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序,并使用 curl 命令发送 POST 请求到 http://localhost:8080/send 端点,以将消息发送到 Kafka。然后,我们可以在控制台上查看消费者接收到的消息。
这就是使用 Spring Boot 和 Kafka 的基本设置。我们可以根据需要进行更改和扩展,以满足特定的需求。
 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/97733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Nginx学习】—Nginx基本知识

【Nginx学习】—Nginx基本知识 一、什么是Nginx Nginx是一个高性能的HTTP和反向代理的web服务器&#xff0c;Nginx是一款轻量级的Web服务器/反向代理服务器处理高并发能力是十分强大的&#xff0c;并且支持热部署&#xff0c;启动简单&#xff0c;可以做到7*24不间断运行。 …

【ringbuff share mem】

ringbuff 和share mem 结合实现PV操作 参考链接 https://juejin.cn/post/7113550346835722276 https://zhuanlan.zhihu.com/p/147826545 代码如下&#xff1a; #include "rb.h"int g_shmid 0;shm_buff * create_shm(int *smid) {int id;shm_buff *share_mem NU…

SketchyCOCO数据集进行前景图像、背景图像和全景图像的分类

SketchyCOCO数据集进行前景图像、背景图像和全景图像的分类 import os import shutildef CopyFile(src, dst, filename):if not os.path.exists(dst):os.makedirs(dst)print(create dir: dst)try:shutil.copy(src\\filename, dst\\filename)except Exception as e:print(cop…

计算机网络-计算机网络体系结构-物理层

目录 一、通信基础 通信方式 传输方式 码元 传输率 *二 准则 2.1奈氏准则(奈奎斯特定理) 2.2香农定理 三、信号的编码和调制 *数字数据->数字信号 数字数据->模拟信号 模拟数据->数字信号 模拟数据->模拟信号 *四、数据交换方式 电路交换 报文交换…

算法练习(11):牛客在线编程07 动态规划

package jz.bm;import javax.crypto.MacSpi; import java.util.ArrayList; import java.util.Arrays;public class bm7 {/*** BM62 斐波那契数列*/public int Fibonacci(int n) {if (n < 2) {return 1;}int[] dp new int[n 1];dp[1] 1;dp[2] 1;for (int i 3; i < n;…

kafka客户端应用参数详解

一、基本客户端收发消息 Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可&#xff1a; <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></depend…

将python项目部署在一台服务器上

将python项目部署在一台服务器上 1.服务器2.部署方法2.1 手动部署2.2 容器化技术部署2.3 服务器less技术部署 1.服务器 服务器一般为&#xff1a;物理服务器和云服务器。 我的是物理服务器&#xff1a;这是将服务器硬件直接放置在您自己的数据中心或机房的传统方法。这种方法需…

力扣 -- 516. 最长回文子序列

解题步骤&#xff1a; 参考代码&#xff1a; class Solution { public:int longestPalindromeSubseq(string s) {int ns.size();vector<vector<int>> dp(n,vector<int>(n));//记得从下往上填表for(int in-1;i>0;i--){//记得i是小于等于j的for(int ji;j&l…

山体滑坡监测系统——高效、便捷的新选择

在当今社会&#xff0c;科技的进步为我们的生活和工作带来了诸多便利。而在山体滑坡监测领域&#xff0c;全球导航卫星系统&#xff08;GNSS&#xff09;的引入更是增加了数据监测的高效性和便捷性。 一、山体滑坡监测系统的基本原理 山体滑坡监测系统是由监控平台和GNSS位移…

2.6 宽带接入技术

思维导图&#xff1a; 前言&#xff1a; 我的理解&#xff1a; 1. **早期互联网接入技术的局限性**&#xff1a; - 作者首先回顾了早期用户通过电话线和调制解调器连接到互联网服务提供商&#xff08;ISP&#xff09;的方式&#xff0c;指出这种方式的速度上限是56 kbit/…

my_print_defaults 及perror

参考文档&#xff1a; https://mysql.net.cn/doc/refman/8.0/en/my-print-defaults.html https://mysql.net.cn/doc/refman/8.0/en/perror.html -- my.cnf的内容 [rootredhat762100 mysql3306]# more my.cnf [mysqld] datadir/mysql/mysql3306/data #socket/tmp/mysql3306.so…

UE5.1编辑器拓展【三、脚本化资产行为,删除无引用资产】

目录 需要考虑的问题 重定向的修复函数 代码&#xff1a; 删除无引用资产 代码 需要添加的头文件和模块 在我们删除资产的时候&#xff0c;会发现&#xff0c;有些资产在删除的时候会出现有被什么什么引用&#xff0c;还有的是没有被引用。 而我们如果直接选择一片去进行…

FFmpeg横竖版视频互换背景模糊一键生成

视频处理是现代多媒体应用中常见的需求。其中横竖版视频互换和背景模糊是视频编辑中常见的操作。FFmpeg是一个功能强大的工具,适用于这些任务。 本文将详细介绍如何使用FFmpeg进行横竖版视频互换和背景模糊。 文章目录 操作命令与命令说明横版转竖版竖版转横版背景模糊处理横…

PHP 伪协议:使用 php://input 访问原始 POST 数据

文章目录 参考环境PHP 伪协议概念为什么需要 PHP 伪协议&#xff1f; php://input为什么需要 php://input&#xff1f;更灵活的数据处理减小性能压力 发送 POST 数据HackBarHackBar 插件的获取 $_POST打开 HackBar 插件通过 HackBar 插件发起 POST 请求 基操 enable_post_data_…

ROS机械臂开发-开发环境搭建【一】

目录 前言环境配置docker搭建Ubuntu环境安装ROS 基础ROS文件系统 bugs 前言 想系统学习ROS&#xff0c;做一些机器人开发。因为有些基础了&#xff0c;这里随便写写记录一下。 环境配置 docker搭建Ubuntu环境 Dockerfile # 基础镜像 FROM ubuntu:18.04 # 设置变量 ENV ETC…

Split index API

Split index API | Elasticsearch Guide [8.10] | Elastic 当您使用Elasticsearch集群出现索引分片设置不合理&#xff08;例如索引主分片设置不合理、每个分片存在大量数据等&#xff09;引发集群性能问题时&#xff0c;可通过_split API在线扩大主分片数&#xff0c;将现有索…

[开源]基于Vue的拖拽式数据报表设计器,为简化开发提高效率而生

一、开源项目简介 Cola-Designer 是一个 基于VUE&#xff0c;实现拖拽 配置方式生成数据大屏&#xff0c;为简化开发、提高效率而生。 二、开源协议 使用GPL-2.0开源协议 三、界面展示 概览 部分截图&#xff1a; 四、功能概述 特性 0 代码 实现完全拖拽 配置式生成…

SpringBoot中常用注解的含义

一、方法参数注解 1. PathVariable 通过RequestMapping注解中的 { } 占位符来标识URL中的变量部分 在控制器中的处理方法的形参中使用PathVariable注解去获取RequestMapping中 { } 中传进来的值&#xff0c;并绑定到处理方法定一的形参上。 //请求路径&#xff1a;http://3333…

【好玩】如何在github主页放一条贪吃蛇

前言 &#x1f34a;缘由 github放小蛇&#xff0c;就问你烧不烧 起因看到大佬github上有一条贪吃蛇扭来扭去&#xff0c;觉得好玩&#xff0c;遂给大家分享一下本狗的玩蛇历程 &#x1f95d;成果初展 贪吃蛇 &#x1f3af;主要目标 实现3大重点 1. github设置主页 2. git…

C#学习系列相关之多线程(二)----Thread类介绍

一、线程初始化 1.无参数 static void Main(string[] args) {//第一种写法Thread thread new Thread(test);thread.Start();//第二种写法 delegateThread thread1 new Thread(new ThreadStart(test));thread1.Start();//第三种写法 lambdaThread thread2 new Thread(() >…