一个简单的spring+kafka生产者

1. pom

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2. 生产者

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.xxx.npi.module.common.msg.dto.MsgBase;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class MyMessageProducerService {@Value("${npi.default-url}")private String domain;private final KafkaTemplate<String, String> kafkaTemplate;public MyMessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public <T extends MsgBase> void sendMessage(String topicName, T msgObj) {List<T> list = new ArrayList<>();list.add(msgObj);if("https://npi.xxx.com".equals(domain)){kafkaTemplate.send(topicName, toJsonString(list));}}public <T extends MsgBase> void sendMessage(String topicName, List<T> list) {if("https://npi.xxx.com".equals(domain)){kafkaTemplate.send(topicName, toJsonString(list));}}private String toJsonString(Object obj) {return JSON.toJSONString(obj,SerializerFeature.WriteDateUseDateFormat,SerializerFeature.WriteMapNullValue,SerializerFeature.WriteNullListAsEmpty,SerializerFeature.WriteNullStringAsEmpty,SerializerFeature.DisableCircularReferenceDetect);}}

3. 配置

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String servers;@Value("${spring.kafka.producer.retries}")private int retries;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.batch-size}")private int batchSize;@Value("${spring.kafka.producer.linger-ms}")private int lingerMs;@Value("${spring.kafka.producer.buffer-memory}")private int bufferMemory;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.producer.security.protocol}")private String securityProtocol;@Value("${spring.kafka.producer.ssl.truststore.location}")private Resource sslTruststoreLocationResource;@Value("${spring.kafka.producer.ssl.truststore.password}")private String sslTruststorePassword;@Value("${spring.kafka.producer.sasl.mechanism}")private String saslMechanism;@Value("${spring.kafka.producer.sasl.jaas.config}")private String saslJaasConfig;@SuppressWarnings({"unchecked", "rawtypes"})@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate(producerFactory());}@SuppressWarnings("unchecked")@Beanpublic ProducerFactory<String, String> producerFactory() {@SuppressWarnings("rawtypes")DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());// factory.transactionCapable();// factory.setTransactionIdPrefix("transaction-");return factory;}public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put("bootstrap.servers", servers);props.put("acks", acks);props.put("retries", retries);props.put("batch.size", batchSize);props.put("linger.ms", lingerMs);props.put("buffer.memory", bufferMemory);props.put("key.serializer", keySerializer);props.put("value.serializer", valueSerializer);props.put("security.protocol", securityProtocol);props.put("sasl.mechanism", saslMechanism);props.put("sasl.jaas.config", saslJaasConfig);// 如果需要更低级别的消息丢失防护,可以启用幂等性props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// SSL配置props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");try {// 将类路径资源转换为临时文件路径InputStream inputStream = sslTruststoreLocationResource.getInputStream();File tempFile = File.createTempFile("client_truststore", ".jks");Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tempFile.getAbsolutePath());props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);} catch (IOException e) {throw new RuntimeException("Failed to locate truststore file", e);}return props;}
}

4. application

spring:kafka:producer:bootstrap-servers: n2.ikt.xxx.com:9092, n3.ikt.xxx.com:9092, n4.ikt.xxx.com:9092, n5.ikt.xxx.com:9092, n6.ikt.xxx.com:9092acks: allretries: 3batch-size: 16384linger-ms: 1buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializersecurity.protocol: SASL_SSLssl.truststore.location: classpath:client_truststore.jksssl.truststore.password: pwdsasl.mechanism: SCRAM-SHA-512sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf-username' password='pwd';topic:br: mdscinpi.mdscinpi-data.tstmem: mdscinpi.msdcinpi-data.tstfbr: mdscinpi.inpi-data.tstcr: mdscinpi.npi-data.tst

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/42097.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

https 自签证书相关生成csr文件、p12文件、crt文件、jks文件、key文件、pem文件

文章目录 前言https 自签证书相关生成csr文件、p12文件、crt文件、jks文件、key文件、pem文件1, 检查openssl的版本2. 生成私钥和证书签署请求 (CSR)3. 生成自签名证书4. 将证书和私钥转换为 PKCS12 格式的密钥库5. 创建信任库 (Truststore)6. 将 PKCS12 文件转换为 JKS 文件7.…

IDEA安装IDE Eval Reset插件,30天自动续期,无限激活

第一步&#xff1a; 下载idea 注意&#xff1a;版本要是2021.2.2以下 第二步&#xff1a;快捷键CtrlAlts打开设置 第三步&#xff1a;打开下图中蓝色按钮 第四步&#xff1a;点击弹窗的 “” &#xff0c;并输入 plugins.zhile.io 点击 “ok” 第五步&#xff1a;搜索IDE Ea…

前端必修技能:高手进阶核心知识分享 - CSS mix-blend-mode 图片混合模式详解

标签定义及使用说明 mix-blend-mode 属性描述了元素的内容应该与元素的直系父元素的内容和元素的背景如何混合。 语法 mix-blend-mod: 使用mix-blend-mode 各种混合模式实例 注意: Internet Explorer 或 Edge 浏览器不支持 mix-blend-mode 属性。 &#xff08;还是那个熟…

AJAX-个人版-思路步骤整理版

前置知识&#xff1a;老式的web创建工程方法就是创建项目然后添加web工件&#xff0c;然后添加lib依赖如&#xff1a;tomcat,servlet&#xff0c;等。 传统请求 对于传统请求操作&#xff1a;整体流程也就是创建静态页面&#xff0c; <!DOCTYPE html> <html lang&q…

CSS技巧:用CSS绘制超写实的酷炫徽章缎带效果,超漂亮,超酷炫

为什么要用CSS来画个徽章&#xff1f;这货脑子进水了吧&#xff01; 今天在电脑前设计&#xff0c;要做徽章效果。突然觉得可以尝试用css实现近似的效果。说干就干&#xff0c;打开编辑器&#xff0c;让我的手指头活跃起来&#xff01; 技术要点 通过多个圆形嵌套和渐变属性…

【Rust练习】1.变量绑定与解构

地址&#xff1a;https://practice-zh.course.rs/variables.html &#x1f31f; 变量只有在初始化后才能被使用 // 修复下面代码的错误并尽可能少的修改 fn main() {let x: i32; // 未初始化&#xff0c;但被使用let y: i32; // 未初始化&#xff0c;也未被使用println!(&quo…

WIN32核心编程 - 线程操作(一) 线程信息 - 线程控制

公开视频 -> 链接点击跳转公开课程博客首页 -> 链接点击跳转博客主页 目录 Thread Thread Control 创建 - Create 执行 - Execute 挂起 - Suspend 恢复 - Resume 终止 - Terminate 远程 - Remote Thread Info GetCurrentThread/Id GetThreadContext CreateToo…

Vue iview-ui 被tooltip包裹的标题,点击跳转后,提示框不消失

tooltip包裹的标题&#xff0c;点击跳转后&#xff0c;提示框不消失 就会有这种显示问题 下面这种错误方法不可行&#xff0c;解决办法往下翻 css写得没错&#xff0c;问题出在Javascript当中的 getElementsByClassName(“xxabc”)&#xff0c; 这个方法得到的是一个由class&q…

【Android】【WIFI】检查 SDIO 设备的状态

检查 SDIO 设备的状态 要检查 Android 设备上 SDIO 设备的状态&#xff0c;可以使用 ADB 命令来获取系统信息。以下是一些示例命令&#xff1a; 列出 SDIO 设备 adb shell cat /proc/devices | grep sdio检查 SDIO 模块是否加载 adb shell lsmod | grep sdio获取 SDIO 相关的…

IDEA中使用Maven打包及碰到的问题

1. 项目打包 IDEA中&#xff0c;maven打包的方式有两种&#xff0c;分别是 install 和 package &#xff0c;他们的区别如下&#xff1a; install 方式 install 打包时做了两件事&#xff0c;① 将项目打包成 jar 或者 war&#xff0c;打包结果存放在项目的 target 目录下。…

自闭症在生活中的典型表现

自闭症&#xff0c;这个看似遥远却又悄然存在于我们周围的疾病&#xff0c;其影响深远且复杂。在日常生活中&#xff0c;自闭症患者的典型表现往往让人印象深刻&#xff0c;这些表现不仅揭示了他们内心的世界&#xff0c;也提醒我们要以更加包容和理解的心态去面对他们。 首先…

R语言4.3.0保姆级安装教程,包含安装包

[软件名称]&#xff1a;R语言4.3.0 R是用于统计分析、绘图的语言和操作环境。R是属于GNU系统的一个自由、免费、源代码开放的软件&#xff0c;它是一个用于统计计算和统计制图的优秀工具。 获取链接: https://pan.quark.cn/s/180306f47179 安装步骤: 1.解压压缩包。 2.进入…

EtherCAT转Profinet网关配置说明第二讲:上位机软件配置

EtherCAT协议转Profinet协议网关模块&#xff08;XD-ECPNS20&#xff09;&#xff0c;不仅可以实现数据之间的通信&#xff0c;还可以实现不同系统之间的数据共享。EtherCAT协议转Profinet协议网关模块&#xff08;XD-ECPNS20&#xff09;具有高速传输的特点&#xff0c;因此通…

iOS开发语言基础与Xcode工具初探

在iOS开发的世界里&#xff0c;Swift语言和Xcode开发工具是每个开发者旅程的起点。Swift&#xff0c;一种由Apple设计的编程语言&#xff0c;以其简洁的语法和强大的性能&#xff0c;成为了iOS开发的首选语言。而Xcode&#xff0c;则是Apple官方提供的集成开发环境&#xff08;…

Spring的核心概念理解案列

IDEA开发的简单“登陆成功”小项目 IDEA项目结构&#xff1a; 每一部分代码和相应的解读&#xff1a; com.itTony文件下有dao&#xff08;实体&#xff09;层&#xff0c;service&#xff08;服务&#xff09;层&#xff0c;编写的2个类&#xff08;HelloSpring和TestSpring&…

docker容器相关命令1(小记)

docker run 只在第一次运行时使用&#xff0c;将镜像放到容器中&#xff0c;以后再次启动这个容器时&#xff0c;只需要使用命令docker start即可。 docker run -it … /bin/bash &#xff1a;表示创建并启动容器直接进入容器的命令行&#xff0c;命令行中exit就是退出容器&…

运维锅总详解CPU

本文从CPU简介、衡量CPU性能指标、单核及多核CPU工作流程、如何平衡 CPU 性能和防止CPU过载、为什么计算密集型任务要选择高频率CPU、超线程技术、CPU历史演进及摩尔定律等方面对CPU进行详细分析。希望对您有所帮助&#xff01; 一、CPU简介 CPU&#xff08;中央处理器&#…

要想贵人相助,首先自己得先成为贵人!

点击上方△腾阳 关注 转载请联系授权 在金庸江湖里&#xff0c;有两位大侠&#xff0c;一个是萧峰&#xff0c;一个是郭靖。 郭靖在《射雕英雄传》里是绝对的主角&#xff0c;在《神雕侠侣》当中也是重要的配角&#xff0c;甚至可以说是第二主角。 谈起郭靖&#xff0c;很多…

昇思MindSpore学习入门-评价指标

当训练任务结束&#xff0c;常常需要评价函数&#xff08;Metrics&#xff09;来评估模型的好坏。不同的训练任务往往需要不同的Metrics函数。例如&#xff0c;对于二分类问题&#xff0c;常用的评价指标有precision&#xff08;准确率&#xff09;、recall&#xff08;召回率&…

20240706 每日AI必读资讯

&#x1f680;Meta 发布 AI 重磅炸弹&#xff1a;多标记预测模型现已开放研究 - 新技术采用多标记预测方法&#xff0c;有望提高性能并缩短训练时间。 - 模型同时预测多个未来单词&#xff0c;可能改善语言结构和上下文理解。 - multi-token prediction模型是Facebook基于大…