SpringBoot Kafka生产者 多kafka配置

一、配置文件

xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092producer: # 设置大于0的值,则客户端会将发送失败的记录重新发送retries: 3 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16Mbatch-size: 16384linger: 1# 设置生产者内存缓冲区的大小。#32Mbuffer-memory: 33554432# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1# 指定消息key和消息体的编解码方式 值的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit: falseoffset-reset: earliestrecords: 10session-timeout: 150000poll-interval: 360000request-timeout: 60000

二、KafkaConfig

package com.xxxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${xxxxxx.kafka.bootstrap-servers}")private String servers;@Value("${xxxxxx.kafka.producer.retries}")private int retries;@Value("${xxxxxx.kafka.producer.batch-size}")private int batchSize;@Value("${xxxxxx-afka.producer.linger}")private int linger;@Value("${xxxxxx.kafka.producer.buffer-memory}")private int bufferMemory;@Value("${xxxxxx.kafka.producer.acks}")private String acks;@Value("${xxxxxx.kafka.producer.key-serializer}")private String keyDeserializer;@Value("${xxxxxx.kafka.producer.value-serializer}")private String valueDeserializer;// 创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);//设置重试次数props.put(ProducerConfig.RETRIES_CONFIG, retries);//达到batchSize大小的时候会发送消息props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//延时时间,延时时间到达之后计算批量发送的大小没达到也发送消息props.put(ProducerConfig.LINGER_MS_CONFIG, linger);//缓冲区的值props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);//序列化手段props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ProducerConfig.ACKS_CONFIG, acks);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Bean(name = "xxxxxxKafkaTemplate")public KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}}

三、生产者

@Resource(name = "xxxxxxKafkaTemplate")private KafkaTemplate kafkaTemplate;

kafkaTemplate.send(topic, message);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/135867.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何成为优秀的产品经理?新人必看!

从一名普通产品经理到一名优秀的产品经理要经历多少步&#xff1f;哪些能力是稀缺的能力&#xff1f;哪些能力又是公司急需的能力&#xff1f;善于推动项目研发重要&#xff0c;还是搞定业务更重要&#xff1f; 这些关于优秀产品经理的能力&#xff0c;本文将全面讨论&#xf…

10-27 maven概念

maven maven的概念模型: 项目对象模型(POM: Project object Model)&#xff0c;一组标准集合: pom.xml 依赖管理系统(Dependency Management System) 项目生命周期(Project Lifecycle) 项目对象模型&#xff1a; 把项目当成一个对象&#xff0c;描述这个项目&#xff0c;使用p…

sql注入学习笔记

sql注入原理 掌握sql注入漏洞的原理掌握sql注入漏洞的分类 万能用户名 777 or 11 #原句 select userid from cms_users where username ".$username." and password".md5 ( $password ) ."输入过后为 select userid from cms_users where username …

Nginx网关配置

安装Nginx 下载最新版本Nginx nginx: download 解压 双击nginx.exe启动 浏览器访问 localhost 看到如下界面 微服务准备 准备两个服务&#xff0c;例如&#xff1a;product微服务和order微服务 分别启动后&#xff0c;访问相应服务接口 product服务 http://localhost:9001/…

【Codeforces】Codeforces Round 905 (Div. 3)

Problem - 1883C - Codeforces 这题当时想复杂了。 题目大意&#xff1a; 给一串数组和一个数字k&#xff0c;求对数组进行多少次操作能是他们的乘积是k的倍数。 操作是选定一个数加上1。 这题需要抓住一个点k属于[2,5]&#xff0c;2&#xff0c;3&#xff0c;4&#xff0c;5中…

将对象与返回的数据所对应的键相同时一一赋值

问题描述 对象与返回的数据直接赋值&#xff0c;会将多余的键与值也添加上 那么赋值时值要 目标对象的键所对应的值 解决方案&#xff1a; 利用双重遍历 来比对 当 键相同时再赋值 duiYingFuZhi(obj,data){for (let key in obj) {for (let index in data) {if (keyindex) {obj…

开发知识点-Pygame

Pygame Pygame最小开发框架与最小游戏游戏开发入门单元开篇 Pygame简介安装游戏开发入门语言开发工具的选择 Pygame最小开发框架与最小游戏 游戏开发入门单元开篇 Pygame简介安装 游戏开发入门语言开发工具的选择

C语言C位出道心法(三):共用体|枚举

C语言C位出道心法(一):基础语法 C语言C位出道心法(二):结构体|结构体指针|链表 一: C语言共用体数据类型认知 二:C语言枚举基本数据类型认知 忙着去耍帅,后期补充完整.............

大厂面试题-MVCC的理解

目录 第一种&#xff1a;读读 第二种&#xff1a;读写 第三种&#xff1a;写写 对于MVCC的理解&#xff0c;可以先从数据库的三种并发场景说起&#xff1a; 第一种&#xff1a;读读 就是线程A与线程B同时在进行读操作&#xff0c;这种情况下不会出现任何并发问题。 第二种…

11 抽象向量空间

抽象向量空间 向量是什么函数什么是线性推论向量空间 这是关于3Blue1Brown "线性代数的本质"的学习笔记。 向量是什么 可以是一个箭头&#xff0c;可以是一组实数&#xff0c;即一个坐标对。 箭头在高维&#xff08;4维&#xff0c;甚至更高&#xff09;空间&…

yum命令中的gcc是什么含义?答:是一种包,并不是参数。

gcc 是 GNU Compiler Collection 的缩写&#xff0c;它是一种广泛用于编译和构建C、C和其他编程语言的编译器套件。 比如对于安装Python前的准备工作命令&#xff1a; sudo yum install gcc openssl-devel bzip2-devel libffi-devel具体而言&#xff0c;在这条命令中&#xf…

软考 系统架构设计师系列知识点之数字孪生体(4)

接前一篇文章&#xff1a;软考 系统架构设计师系列知识点之数字孪生体&#xff08;3&#xff09; 所属章节&#xff1a; 第11章. 未来信息综合技术 第5节. 数字孪生体技术概述 4. 数字孪生体的应用 数字孪生体主要应用于制造、产业、城市和战场。 &#xff08;1&#xff09;…

【OpenCV】 拟合直线 与 霍夫直线 对比 , fitLine()与 HoughLinesP()对比

文章目录 1 fitLine 与 HoughLinesP 函数原型2 拟合直线 与 霍夫直线 对比拟合线和圆,是通过已知点拟合出对应的方程,拟合方法如最小二乘法,RANSAC算法等。如果拟合点的离散成都较高,拟合方法的正确选择,是提高识别精度的一大要点。 1 fitLine 与 HoughLinesP 函数原型 …

ReentrantLock

文章目录 1. 简介2. 可重入3. 可中断4. 锁超时5. 使用可重入锁解决哲学家就餐问题6. 公平锁7. 条件变量 1. 简介 ReentrantLock也称为可重入锁&#xff0c;相对于synchronized它有如下特点&#xff1a; 可中断&#xff1a;synchronized获取了锁&#xff0c;除非线程自己结束&…

本周Github有趣的项目、工具和库:Radius等

Github有趣的项目、工具和库 1、Radius Radius 是一个开源云原生应用程序平台&#xff0c;使开发人员和支持他们的运营商能够跨公共云和私有基础设施定义、部署和协作云原生应用程序 不仅仅是 Kubernetes Radius 通过支持 Kubernetes 等成熟技术、Terraform 和 Bicep 等现有基…

自然语言处理中的文本聚类:揭示模式和见解

一、介绍 在自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;文本聚类是一种基本且通用的技术&#xff0c;在信息检索、推荐系统、内容组织和情感分析等各种应用中发挥着关键作用。文本聚类是将相似文档或文本片段分组为簇或类别的过程。这项技术使我们能够发现隐藏的…

JavaWeb篇_02——服务器简介及Tomcat服务器简介

服务器简介 硬件服务器的构成与一般的PC比较相似&#xff0c;但是服务器在稳定性、安全性、性能等方面都要求更高&#xff0c;因为CPU、芯片组、内存、磁盘系统、网络等硬件和普通PC有所不同。软件服务器&#xff08;英文名称Server&#xff09;&#xff0c;也称伺服器。指一个…

字符加密A--E,B-F,W--A

文章目录 前言一、题目描述 二、题目分析 三、解题 程序运行代码 前言 本系列为选择结构编程题&#xff0c;点滴成长&#xff0c;一起逆袭。 一、题目描述 二、题目分析 三、解题 程序运行代码 #include<stdio.h> int main(){char c;cgetchar();if(c>a&&…

国潮力量:中国年轻一代如何通过跨境电商推广中国文化

中国国潮&#xff0c;或称国民潮流&#xff0c;是中国年轻一代通过各种方式&#xff0c;如时尚、音乐、文化和艺术&#xff0c;展示他们的文化身份和创新的表达方式。国潮不仅在国内走红&#xff0c;还在国际市场上崭露头角。 其中&#xff0c;跨境电商在国潮的传播和推广中发…

服务器数据恢复—云服务器mysql数据库表被truncate的数据恢复案例

云服务器数据恢复环境&#xff1a; 阿里云ECS网站服务器&#xff0c;linux操作系统mysql数据库。 云服务器故障&#xff1a; 在执行数据库版本更新测试时&#xff0c;在生产库误执行了本来应该在测试库执行的sql脚本&#xff0c;导致生产库部分表被truncate&#xff0c;还有部…