Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。

证书处理:

  • KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
  • TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:

cat your_cert.pem your_key.key > test.pem

  1. 合并证书和私钥为一个 PKCS12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias

2,将 PKCS12 文件导入到 Java KeyStore 中:

keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS

要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。

下面是生成 truststore.jks 的步骤:

  1. 获取服务器的根证书或证书链。您可以使用之前提到的 openssl s_client 命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts

  2. 将根证书或证书链保存为 .pem 文件。

  3. 使用 keytool 命令将根证书或证书链导入到 truststore.jks 文件中:

    keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks

 

项目集成:

maven集成:

  <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.5.RELEASE</version></dependency>

nacos配置:

spring:kafka:bootstrap-servers: SSL://connectedca.com:443  ##换成你自己的连接ssl:protocol: TLS
###3这三个密码是你证书配置的时候设置的密码trust-store-password: a123456key-store-password: a123456key-password: a123456consumer:group-id: producer:topic: *.event  ##换成你自己的topic

核心配置:


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConfiguration {@AutowiredC3ConfigProperties c3ConfigProperties;@Autowiredprivate KafkaConfig kafkaProperties;@Autowiredprivate ResourceLoader resourceLoader;@Beanpublic KafkaAdmin kafkaAdmin() {Map <String, Object> configs = new HashMap <>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());return new KafkaAdmin(configs);}@Beanpublic DefaultKafkaConsumerFactory <String, String> consumerFactory() {Map <String, Object> consumerConfig = new HashMap <>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名String pemUrl = "";String csrUrl = "";if (c3ConfigProperties.getEnvironment().equals("uat")) {pemUrl = "file/uat/kafka/client.jks";csrUrl = "file/uat/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("pre")) {pemUrl = "file/pre/kafka/client.jks";csrUrl = "file/pre/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("prod")) {pemUrl = "file/prod/kafka/client.jks";csrUrl = "file/prod/kafka/truststore.jks";}try {// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
// 获取证书文件的路径String keyStorePath = pemResource.getFile().getAbsolutePath();String trustStorePath = csrResource.getFile().getAbsolutePath();consumerConfig.put("ssl.keystore.location", keyStorePath);consumerConfig.put("ssl.truststore.location", trustStorePath);}catch (Exception e){log.error("Resource file error:{}",e.getMessage());}consumerConfig.put("security.protocol", "SSL");consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());return new DefaultKafkaConsumerFactory <>(consumerConfig);}@Beanpublic ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器return factory;}@Beanpublic KafkaC3MsgListener kafkaC3MsgListener() {return new KafkaC3MsgListener();}}

注入配置:


import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.producer.topic}")private String topic;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-password}")private String keyStorePassword;@Value("${spring.kafka.ssl.key-password}")private String keyPassword;}

能够看到这个配置就成功了表示:

然后在监听处理消息即可

 ————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/734596.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分类预测 | Matlab基于TTAO-CNN-LSTM-Attention三角拓扑聚合优化算法优化卷积神经网络-长短期记忆网络-注意力机制的数据分类预测

分类预测 | Matlab基于TTAO-CNN-LSTM-Attention三角拓扑聚合优化算法优化卷积神经网络-长短期记忆网络-注意力机制的数据分类预测 目录 分类预测 | Matlab基于TTAO-CNN-LSTM-Attention三角拓扑聚合优化算法优化卷积神经网络-长短期记忆网络-注意力机制的数据分类预测分类效果基…

ABC344 A-E题解

文章目录 A题目AC Code&#xff1a; B题目AC Code&#xff1a; C题目AC Code&#xff1a; D题目AC Code&#xff1a; E题目AC Code&#xff1a; 不易不难&#xff0c;写到5题很简单&#xff0c;但是要有足够的思维能力。 A 题目 我们用一个 flag 变量记录我们是不是在两个竖…

python根据文件路径获取文件名

如果你想要获取文件路径中的文件名但不包括后缀名&#xff0c;你可以使用os.path.splitext()函数来分割文件名和后缀名&#xff0c;然后只取第一个部分。下面是一个例子&#xff1a; import os# 假设你有一个文件路径 file_path "/path/to/your/file.txt"# 使用os.…

人工智能、深度学习、机器学习书目推荐

AI入门书籍 人工智能基础 《Python神经网络编程》[英]塔里克拉希德(TariqRashid)中国工信部出版社入门强推&#xff0c;非常清晰的描述基于神经网络的人工智能基本原理&#xff0c;入门必看书目《统计学习方法》李航清华大学出版社人工智能必备数学基础&#xff0c;需要一定的…

【MacOS 上安装 Homebrew 】讲解

macOS 安装 Homebrew macOS 上安装 Homebrew 是一个相对简单的过程&#xff0c;Homebrew 是一款开源的软件包管理工具&#xff0c;它可以让你在 Mac 上轻松安装、更新和管理软件包。以下是安装 Homebrew 的步骤&#xff1a; 1. 打开终端&#xff1a; 你可以通过在 Finder 中…

CCF-CSP真题201403-2《窗口》(结构体+数组)

问题描述 在某图形操作系统中,有 N 个窗口,每个窗口都是一个两边与坐标轴分别平行的矩形区域。窗口的边界上的点也属于该窗口。窗口之间有层次的区别,在多于一个窗口重叠的区域里,只会显示位于顶层的窗口里的内容。   当你点击屏幕上一个点的时候,你就选择了处于被点击位置的…

前端发展史与优秀编程语言

前端开发是互联网技术领域中的一个重要分支&#xff0c;负责构建用户直接交互的网页和应用程序界面。随着互联网的发展&#xff0c;前端技术经历了多个阶段的演变&#xff0c;从最初的简单静态页面到如今的复杂交互式应用&#xff0c;不断推动着用户体验的提升和网页功能的丰富…

Vue3 重置覆盖 reactive 数组数据的方法

核心要点&#xff1a; 通过splice删除原数组内的所有数据&#xff0c;并添加新的数据进去。潜在影响&#xff1a;大数据量下&#xff0c;splice重置数组和 ref 的.value重新赋值重置数组&#xff0c;哪个耗时短还需自行测试。 通过 splice 传入0 和 Infinity 来删除原数组从头…

【Python】进阶学习:OpenCV--一文详解cv2.namedWindow()

【Python】进阶学习&#xff1a;OpenCV–一文详解cv2.namedWindow() &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望…

C++椭圆检测论文复现 Ubuntu 22.04+Vscode+opencv3.4

复现的代码 本博客旨在复现论文《An Efficient High-quality Ellipse Detection》&#xff0c;该文章本来只有Matlab的代码实现&#xff0c;后来被islands翻译成了c 库&#xff0c;大家可以参考islands发在知乎上的文章高质量椭圆检测库&#xff0c;C的代码链接。 使用环境 U…

整合生成型AI战略:从宏观思维到小步实践

“整合生成型AI战略&#xff1a;从宏观思维到小步实践” 在这篇文章中&#xff0c;我们探讨了将生成型AI和大型语言模型融入企业核心业务的战略开发方法。我们的方法基于敏捷开发原则&#xff0c;技术专家和数据科学家需要采纳商业思维&#xff0c;而执行官则需理解生成型AI和…

ROS2动作通信的实现

文章目录 1.动作通信的概念及应用场景1.1 概念1.2 应用场景 2.准备工作3.动作通信的实现3.1 动作通信接口消息3.2 服务端实现3.3 客户端实现3.4 编译及运行 1.动作通信的概念及应用场景 1.1 概念 动作通信适用于长时间运行的任务。就结构而言动作通信由目标、反馈和结果三部分…

吴恩达机器学习-可选实验室:可选实验:使用逻辑回归进行分类(Classification using Logistic Regression)

在本实验中&#xff0c;您将对比回归和分类。 import numpy as np %matplotlib widget import matplotlib.pyplot as plt from lab_utils_common import dlc, plot_data from plt_one_addpt_onclick import plt_one_addpt_onclick plt.style.use(./deeplearning.mplstyle)jupy…

Java实战:Spring Boot利用MinIO实现文件切片上传

本文将详细介绍如何在 Spring Boot 应用程序中使用 MinIO 实现文件切片极速上传技术。我们将深入探讨 MinIO 的概念、文件切片上传的原理&#xff0c;以及如何使用 Spring Boot 和 MinIO 实现文件切片上传和合并。 1. 引言 在现代的互联网应用中&#xff0c;文件上传是一个常…

第三百九十二回

文章目录 1. 概念介绍2. 方法与细节2.1 实现方法2.2 具体细节 3. 示例代码4. 内容总结 我们在上一章回中介绍了"如何混合选择多个图片和视频文件"相关的内容&#xff0c;本章回中将介绍如何通过相机获取图片文件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. …

JavaWeb--Maven

一&#xff1a;概述 1.简介 Maven 是专门用于管理和构建 Java 项目的工具&#xff0c;它的主要功能有&#xff1a; 提供了一套标准化的项目结构 提供了一套标准化的构建流程&#xff08;编译&#xff0c;测试&#xff0c;打包&#xff0c;发布 …… &#xff09; 提供了一套…

Minio搭建文件服务器

目录 一、Minio使用&#x1f355;1.1 Minio介绍1.2 Minio安装1.3 Minio入门 二、创建后端服务&#x1f953;2.1创建一个SpringBoot项目2.2 代码实现2.2.1 FileUploadController2.3.2 FileUploadService2.3.3 MinioProperties2.3.4 MinioServerApplication2.2.4 配置文件内容 三…

如何使用固定公网地址SFTP远程传输文件至安卓Termux本地目录?

文章目录 1. 安装openSSH2. 安装cpolar3. 远程SFTP连接配置4. 远程SFTP访问4. 配置固定远程连接地址 SFTP&#xff08;SSH File Transfer Protocol&#xff09;是一种基于SSH&#xff08;Secure Shell&#xff09;安全协议的文件传输协议。与FTP协议相比&#xff0c;SFTP使用了…

misc40

下载附件&#xff0c;发现只有第三个wav文件需要密码&#xff0c;其他都可以看 打开 conversion.txt 二进制转十进制得到202013 开 一张普通的二维码.png&#xff0c;直接扫不出结果。 010查看图片尾部发现 Brainfuck 编码 解码得到&#xff1a; 和谐民主和谐文明和谐和谐和谐…

数据分析-Pandas数据分组箱线图

数据分析-Pandas数据分组箱线图 数据分析和处理中&#xff0c;难免会遇到各种数据&#xff0c;那么数据呈现怎样的规律呢&#xff1f;不管金融数据&#xff0c;风控数据&#xff0c;营销数据等等&#xff0c;莫不如此。如何通过图示展示数据的规律&#xff1f; 数据表&#x…