Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。

证书处理:

  • KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
  • TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:

cat your_cert.pem your_key.key > test.pem

  1. 合并证书和私钥为一个 PKCS12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias

2,将 PKCS12 文件导入到 Java KeyStore 中:

keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS

要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。

下面是生成 truststore.jks 的步骤:

  1. 获取服务器的根证书或证书链。您可以使用之前提到的 openssl s_client 命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts

  2. 将根证书或证书链保存为 .pem 文件。

  3. 使用 keytool 命令将根证书或证书链导入到 truststore.jks 文件中:

    keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks

 

项目集成:

maven集成:

  <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.5.RELEASE</version></dependency>

nacos配置:

spring:kafka:bootstrap-servers: SSL://connectedca.com:443  ##换成你自己的连接ssl:protocol: TLS
###3这三个密码是你证书配置的时候设置的密码trust-store-password: a123456key-store-password: a123456key-password: a123456consumer:group-id: producer:topic: *.event  ##换成你自己的topic

核心配置:


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConfiguration {@AutowiredC3ConfigProperties c3ConfigProperties;@Autowiredprivate KafkaConfig kafkaProperties;@Autowiredprivate ResourceLoader resourceLoader;@Beanpublic KafkaAdmin kafkaAdmin() {Map <String, Object> configs = new HashMap <>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());return new KafkaAdmin(configs);}@Beanpublic DefaultKafkaConsumerFactory <String, String> consumerFactory() {Map <String, Object> consumerConfig = new HashMap <>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名String pemUrl = "";String csrUrl = "";if (c3ConfigProperties.getEnvironment().equals("uat")) {pemUrl = "file/uat/kafka/client.jks";csrUrl = "file/uat/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("pre")) {pemUrl = "file/pre/kafka/client.jks";csrUrl = "file/pre/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("prod")) {pemUrl = "file/prod/kafka/client.jks";csrUrl = "file/prod/kafka/truststore.jks";}try {// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
// 获取证书文件的路径String keyStorePath = pemResource.getFile().getAbsolutePath();String trustStorePath = csrResource.getFile().getAbsolutePath();consumerConfig.put("ssl.keystore.location", keyStorePath);consumerConfig.put("ssl.truststore.location", trustStorePath);}catch (Exception e){log.error("Resource file error:{}",e.getMessage());}consumerConfig.put("security.protocol", "SSL");consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());return new DefaultKafkaConsumerFactory <>(consumerConfig);}@Beanpublic ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器return factory;}@Beanpublic KafkaC3MsgListener kafkaC3MsgListener() {return new KafkaC3MsgListener();}}

注入配置:


import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.producer.topic}")private String topic;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-password}")private String keyStorePassword;@Value("${spring.kafka.ssl.key-password}")private String keyPassword;}

能够看到这个配置就成功了表示:

然后在监听处理消息即可

 ————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/734596.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分类预测 | Matlab基于TTAO-CNN-LSTM-Attention三角拓扑聚合优化算法优化卷积神经网络-长短期记忆网络-注意力机制的数据分类预测

分类预测 | Matlab基于TTAO-CNN-LSTM-Attention三角拓扑聚合优化算法优化卷积神经网络-长短期记忆网络-注意力机制的数据分类预测 目录 分类预测 | Matlab基于TTAO-CNN-LSTM-Attention三角拓扑聚合优化算法优化卷积神经网络-长短期记忆网络-注意力机制的数据分类预测分类效果基…

【Python】进阶学习:OpenCV--一文详解cv2.namedWindow()

【Python】进阶学习&#xff1a;OpenCV–一文详解cv2.namedWindow() &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望…

C++椭圆检测论文复现 Ubuntu 22.04+Vscode+opencv3.4

复现的代码 本博客旨在复现论文《An Efficient High-quality Ellipse Detection》&#xff0c;该文章本来只有Matlab的代码实现&#xff0c;后来被islands翻译成了c 库&#xff0c;大家可以参考islands发在知乎上的文章高质量椭圆检测库&#xff0c;C的代码链接。 使用环境 U…

ROS2动作通信的实现

文章目录 1.动作通信的概念及应用场景1.1 概念1.2 应用场景 2.准备工作3.动作通信的实现3.1 动作通信接口消息3.2 服务端实现3.3 客户端实现3.4 编译及运行 1.动作通信的概念及应用场景 1.1 概念 动作通信适用于长时间运行的任务。就结构而言动作通信由目标、反馈和结果三部分…

吴恩达机器学习-可选实验室:可选实验:使用逻辑回归进行分类(Classification using Logistic Regression)

在本实验中&#xff0c;您将对比回归和分类。 import numpy as np %matplotlib widget import matplotlib.pyplot as plt from lab_utils_common import dlc, plot_data from plt_one_addpt_onclick import plt_one_addpt_onclick plt.style.use(./deeplearning.mplstyle)jupy…

第三百九十二回

文章目录 1. 概念介绍2. 方法与细节2.1 实现方法2.2 具体细节 3. 示例代码4. 内容总结 我们在上一章回中介绍了"如何混合选择多个图片和视频文件"相关的内容&#xff0c;本章回中将介绍如何通过相机获取图片文件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. …

JavaWeb--Maven

一&#xff1a;概述 1.简介 Maven 是专门用于管理和构建 Java 项目的工具&#xff0c;它的主要功能有&#xff1a; 提供了一套标准化的项目结构 提供了一套标准化的构建流程&#xff08;编译&#xff0c;测试&#xff0c;打包&#xff0c;发布 …… &#xff09; 提供了一套…

Minio搭建文件服务器

目录 一、Minio使用&#x1f355;1.1 Minio介绍1.2 Minio安装1.3 Minio入门 二、创建后端服务&#x1f953;2.1创建一个SpringBoot项目2.2 代码实现2.2.1 FileUploadController2.3.2 FileUploadService2.3.3 MinioProperties2.3.4 MinioServerApplication2.2.4 配置文件内容 三…

如何使用固定公网地址SFTP远程传输文件至安卓Termux本地目录?

文章目录 1. 安装openSSH2. 安装cpolar3. 远程SFTP连接配置4. 远程SFTP访问4. 配置固定远程连接地址 SFTP&#xff08;SSH File Transfer Protocol&#xff09;是一种基于SSH&#xff08;Secure Shell&#xff09;安全协议的文件传输协议。与FTP协议相比&#xff0c;SFTP使用了…

misc40

下载附件&#xff0c;发现只有第三个wav文件需要密码&#xff0c;其他都可以看 打开 conversion.txt 二进制转十进制得到202013 开 一张普通的二维码.png&#xff0c;直接扫不出结果。 010查看图片尾部发现 Brainfuck 编码 解码得到&#xff1a; 和谐民主和谐文明和谐和谐和谐…

数据分析-Pandas数据分组箱线图

数据分析-Pandas数据分组箱线图 数据分析和处理中&#xff0c;难免会遇到各种数据&#xff0c;那么数据呈现怎样的规律呢&#xff1f;不管金融数据&#xff0c;风控数据&#xff0c;营销数据等等&#xff0c;莫不如此。如何通过图示展示数据的规律&#xff1f; 数据表&#x…

在垃圾回收时哪些可以作为垃圾回收的根对象?

1.System.class 由启动类加载器加载的类&#xff0c;一些核心的类&#xff0c;不如说 2.Native Stack java虚拟机在执行方法调用时必须执行操作系统方法&#xff0c;操作系统方法执行时所引用的一些java对象。 3.Thread 活动线程所引用的一些对象。 4.Busy monitor 被同…

深度学习-Softmax 回归 + 损失函数 + 图片分类数据集

Softmax 回归 损失函数 图片分类数据集 1 softmax2 损失函数1均方L1LossHuber Loss 3 图像分类数据集4 softmax回归的从零开始实现 1 softmax Softmax是一个常用于机器学习和深度学习中的激活函数。它通常用于多分类问题&#xff0c;将一个实数向量转换为概率分布。Softmax函…

Spring Boot 自动装配的原理!!!

SpringBootApplication SpringBootConfiguration&#xff1a;标识启动类是一个IOC容器的配置类 EnableAutoConfiguration&#xff1a; AutoConfigurationPackage&#xff1a;扫描启动类所在包及子包中所有的组件&#xff0c;生…

C++特殊类设计【特殊类 || 单例对象 || 饿汉模式 || 懒汉模式】

目录 一&#xff0c;特殊类设计 1. 只在堆上创建的类 2. 只允许在栈上创建的类 3. 不能被继承的类 4. 不能被拷贝的类 5. 设计一个类&#xff0c;只能创建一个对象&#xff08;单例对象&#xff09; 饿汉模式 懒汉模式 C11静态成员初始化多线程安全问题 二&#xff…

linux安装ngnix完整步骤(支持centos/银河麒麟操作系统)

linux安装ngnix&#xff08;支持centos/银河麒麟操作系统&#xff09; 本次操作系统安装ngnix采用离线或在线安装方式&#xff0c;离线就是不联网环境&#xff0c;在线则是联网环境&#xff1b;支持centos7或centos8或国产操作系统&#xff08;银河麒麟高级服务器操作系统&…

element-ui radio 组件源码分享

今日简单分享 radio 组件的实现原理&#xff0c;主要从以下三个方面来分享&#xff1a; 1、radio 页面结构 2、radio 组件属性 3、radio 组件方法 一、radio 页面结构 1.1 页面结构如下&#xff1a; 二、radio 属性 2.1 value / v-model 属性&#xff0c;类型为 string / …

鲜为人知的闰年判定大坑

【题目描述】 输入年份&#xff0c;判断是否为闰年。如果是&#xff0c;则输出yes&#xff0c;否则输出no。 提示&#xff1a;简单地判断除以4的余数是不够的。 【题目来源】 刘汝佳《算法竞赛入门经典 第2版》习题1-7 年份&#xff08;year&#xff09; 【解析】 一、闰…

Decontam去污染:一个尝试

为了程序运行的便利性&#xff0c;不想将Decontam放到windows的Rstudio里面运行&#xff0c;需要直接在Ubuntu中运行&#xff0c;并且为了在Decontam时进行其他操作&#xff0c;使用python去运行R 首先你需要有一个conda环境&#xff0c;安装了R&#xff0c;Decontam&#xff0…

云计算的部署方式(公有云、私有云、混合云、社区云)

云计算的部署方式(公有云、私有云、混合云、社区云) 目录 零、00时光宝盒 一、云计算的部署方式 1.1、公有云&#xff08;Public Cloud&#xff09; 1.2、私有云&#xff08;Private Cloud&#xff09;  1.3、混合云&#xff08;Hybrid Cloud&#xff09; 1.4、社区云&am…