先决条件
在我们深入为 Pulsar 创建自定义身份验证机制之前,请确保您具有以下设置:
- Java 17: 确保您的环境中已安装并设置 Java 17。
- Spring Boot Version 3.3.2: 我们将使用 Spring Boot 创建自定义 Pulsar 客户端。
- Docker & Docker Compose: 在容器化环境中运行 Pulsar 代理所必需的。
- Maven: 用于构建和管理 Java 项目中的依赖关系。
Overview
在本指南中,我们将为 Apache Pulsar 创建一个自定义身份验证提供程序。我们的自定义提供程序将通过验证请求标头中发送到 REST API 的用户名和密码组合来处理身份验证。如果身份验证成功,将被授予访问 Pulsar 的权限;否则将被拒绝。
我们将按照官方 Pulsar 文档来扩展 Pulsar 的安全部分。
github repo: https://github.com/urdogan0000/PulsarCustomAuthSpringboot
Linked-in: https://www.linkedin.com/in/haydarurdogan/
第 1 步:设置环境
首先,让我们为自定义 Pulsar 代理身份验证设置一个新的 Maven 项目:
项目结构:
pulsar-custom-auth
│
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── liderahenkpulsar
│ │ │ └── auth
│ │ │ └── BasicAuthProvider.java
| | | |_ AuthenticationBasicAuth
| | | |__ CustomDataBasic
│ │ └── resources
│ │ └── application.properties
│ └── test
└── pom.xml
pom.xml:添加以下依赖项:
<dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-broker-common</artifactId><version>3.2.3</version></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.12.0</version></dependency>
</dependencies>
第 2 步:创建自定义代理身份验证
现在,让我们通过实现 Pulsar 的 AuthenticationProvider
接口来创建 BasicAuthProvider
类:
BasicAuthProvider.java:
package com.liderahenkpulsar.auth;import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.naming.AuthenticationException;
import java.io.IOException;public class BasicAuthProvider implements AuthenticationProvider {static final String HTTP_HEADER_NAME = "Authorization";static final String AUTH_METHOD_NAME = "customAuth";static final String HTTP_HEADER_VALUE_PREFIX = "Basic";static final String REST_API_URL_NAME = "authRestApiEndpoint";private static final Logger log = LoggerFactory.getLogger(BasicAuthProvider.class);private String apiEndpoint;private OkHttpClient httpClient;@Overridepublic void initialize(ServiceConfiguration config) throws PulsarServerException {httpClient = new OkHttpClient();this.apiEndpoint = (String) config.getProperties().getOrDefault(REST_API_URL_NAME, "http://localhost:8081/pulsar/send");log.info("BasicAuthProvider initialized with endpoint: {}", apiEndpoint);}@Overridepublic String getAuthMethodName() {return AUTH_METHOD_NAME;}@Overridepublic String authenticate(AuthenticationDataSource authData) throws AuthenticationException {String credentials = getUserCredentials(authData);log.info("Authentication request to endpoint: {}", apiEndpoint);log.info("Authorization header: {}", credentials);Request request = new Request.Builder().url(apiEndpoint).addHeader(HTTP_HEADER_NAME, credentials).build();try (Response response = httpClient.newCall(request).execute()) {if (response.isSuccessful()) {assert response.body() != null;String responseBody = response.body().string();log.info("Authentication successful: {}", responseBody);return responseBody;} else {log.warn("Authentication failed. HTTP status code: {}, Response: {}",response.code(),response.body().string());throw new AuthenticationException("Authentication failed. Invalid username or password.");}} catch (IOException e) {log.error("Error during authentication: ", e);throw new AuthenticationException("Authentication process encountered an error.");}}private static String getUserCredentials(AuthenticationDataSource authData) throws AuthenticationException {if (authData.hasDataFromCommand()) {String commandData = authData.getCommandData();log.info("Extracted command data: {}", commandData);return validateUserCredentials(commandData);} else if (authData.hasDataFromHttp()) {String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);if (httpHeaderValue == null) {throw new AuthenticationException("Invalid HTTP Authorization header");}return validateUserCredentials(httpHeaderValue);} else {throw new AuthenticationException("No user credentials passed");}}private static String validateUserCredentials(final String userCredentials) throws AuthenticationException {if (StringUtils.isNotBlank(userCredentials)) {return userCredentials;} else {throw new AuthenticationException("Invalid or blank user credentials found");}}@Overridepublic void close() {if (httpClient != null) {httpClient.connectionPool().evictAll();}}
}
第 3 步:实施自定义客户端身份验证
为了实现自定义客户端,我们将创建两个类:
-
AuthenticationBasicAuth
:此类实现主要身份验证逻辑并向 Pulsar 代理提供凭证。 -
CustomDataBasic
:此类提供向 Pulsar 发出请求时进行身份验证所需的数据(标头)。
1. AuthenticationBasicAuth
Class
此类负责定义身份验证方法并管理用户凭据。它实现了 Authentication
和 EncodedAuthenticationParameterSupport
接口。
AuthenticationBasicAuth.java:
package com.liderahenkpulsar.auth;import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.Map;public class AuthenticationBasicAuth implements Authentication, EncodedAuthenticationParameterSupport {private static final Logger log = LoggerFactory.getLogger(AuthenticationBasicAuth.class);private static final String AUTH_NAME = "customAuth"; // Ensure this matches your Pulsar broker's expectationprivate String userId;private String password;// Default constructor for reflection or configuration usagepublic AuthenticationBasicAuth() {log.info("AuthenticationBasicAuth instantiated without parameters. Awaiting configuration.");}// Constructor to directly accept userId and passwordpublic AuthenticationBasicAuth(String userId, String password) {if (userId == null || userId.isEmpty() || password == null || password.isEmpty()) {throw new IllegalArgumentException("User ID and password must not be null or empty");}this.userId = userId;this.password = password;log.info("AuthenticationBasicAuth instantiated with userId: {} and password: [PROTECTED]", userId);}@Overridepublic void close() throws IOException {// No operation needed on close}@Overridepublic String getAuthMethodName() {return AUTH_NAME;}@Overridepublic AuthenticationDataProvider getAuthData() {return new CustomDataBasic(userId, password);}@Overridepublic void configure(Map<String, String> authParams) {// No-op for map configurationlog.info("Configured with authParams: {}", authParams);}@Overridepublic void configure(String encodedAuthParamString) {// No-op for encoded string configurationlog.info("Configured with encodedAuthParamString: {}", encodedAuthParamString);}@Overridepublic void start() {log.info("Starting AuthenticationBasicAuth for userId: {}", userId);}
}
方法说明:
- 构造函数:有两个构造函数 - 一个默认(无参数),当没有直接传递配置时,另一个接受“userId”和“password”。后者确保在继续之前正确设置这些参数。
**getAuthMethodName**
:返回身份验证方法名称,该名称应与自定义代理身份验证 (customAuth
) 中定义的方法名称匹配。**getAuthData**
:提供CustomDataBasic
的实例,其中包含身份验证标头。**configure(Map<String, String> authParams)**
和**configure(StringencodedAuthParamString)**
:这些方法允许您使用参数配置身份验证实例。它们在这里是无操作的,因为我们通过构造函数处理配置,但为了可见性而包含日志记录。**start**
:初始化或启动身份验证过程,主要用于日志记录目的。
2. CustomDataBasic
Class
此类提供身份验证所需的实际数据,特别是处理 HTTP 请求的标头和命令数据。
CustomDataBasic.java:
package com.liderahenkpulsar.auth;import org.apache.pulsar.client.api.AuthenticationDataProvider;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class CustomDataBasic implements AuthenticationDataProvider {private static final String HTTP_HEADER_NAME = "Authorization";private final String commandAuthToken;private Map<String, String> headers = new HashMap<>();public CustomDataBasic(String userId, String password) {// Create the basic auth tokenthis.commandAuthToken = "Basic " + userId + ":" + password; // Ideally, base64 encode this string// Initialize headersheaders.put(HTTP_HEADER_NAME, this.commandAuthToken);this.headers = Collections.unmodifiableMap(this.headers);}@Overridepublic boolean hasDataForHttp() {return true; // Indicate that HTTP headers are available}@Overridepublic Set<Map.Entry<String, String>> getHttpHeaders() {return this.headers.entrySet(); // Return the HTTP headers for authentication}@Overridepublic boolean hasDataFromCommand() {return true; // Indicate that command data is available}@Overridepublic String getCommandData() {return this.commandAuthToken; // Return the command data for authentication}
}
方法说明:
- 构造函数:通过连接前缀为
Basic
的userId
和password
来初始化commandAuthToken
。在现实场景中,应该使用 Base64 进行编码。 **hasDataForHttp**
: 返回true
,表示有数据(标头)可用于 HTTP 身份验证。**getHttpHeaders**
:提供 Pulsar 身份验证所需的 HTTP 标头,包括Authorization
标头。**hasDataFromCommand**
:返回true
,表示命令数据(凭证)可用。**getCommandData**
:返回身份验证令牌 (commandAuthToken
) 作为命令数据。
第 1 步:构建自定义身份验证 JAR 文件
-
创建 JAR 文件:确保您的 Java 项目已正确设置所有依赖项。使用 Maven 或 Gradle 编译项目并将其打包成 JAR 文件。
-
对于Maven:
mvn clean package
- 构建后,您应该有一个 JAR 文件,通常位于
target
目录中,例如target/liderahenkpulsar.auth-1.0.jar
。
修改代理配置文件
下载原始 Broker 配置:从 Pulsar GitHub 存储库下载原始 broker.conf
:
wget https://raw.githubusercontent.com/apache/pulsar/branch-3.3/conf/broker.conf -O broker.conf
更新代理配置:编辑 broker.conf
以包含您的自定义身份验证设置。以下是需要修改的关键行:
### --- Authentication --- ###
authenticationEnabled=true# Specify the authentication providers to use
authenticationProviders=com.liderahenkpulsar.auth.BasicAuthProvider# Define the authentication method name that matches your AuthenticationBasicAuth class
authenticationMethods=customAuth# (Optional) Specify the endpoint for your authentication service
authRestApiEndpoint=http://localhost:8083/pulsar/send # Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in the same or other clusters
brokerClientAuthenticationPlugin=com.liderahenkpulsar.auth.AuthenticationBasicAuth
brokerClientAuthenticationParameters=
# (Optional) Specify the endpoint for your authentication service
authRestApiEndpoint=http://localhost:8083/pulsar/send //this part is for your login rest api servic
将更新的配置文件保存为 custom-auth-broker.conf
。
为 Pulsar 设置 Docker Compose
- 创建 Docker Compose 文件:
- 下面是完整的 Docker Compose 文件。将其保存为
docker-compose.yml
,位于 JAR 文件和更新的代理配置所在的同一目录中。
version: '3.8'
networks:pulsar:driver: bridgeservices:# Start Zookeeperzookeeper:image: apachepulsar/pulsar:3.3.1container_name: zookeeperrestart: on-failurenetworks:- pulsarvolumes:- ./data/zookeeper:/pulsar/data/zookeeperenvironment:- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256mcommand: >bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \bin/generate-zookeeper-config.sh conf/zookeeper.conf && \exec bin/pulsar zookeeper"healthcheck:test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]interval: 10stimeout: 5sretries: 30# Init cluster metadatapulsar-init:image: apachepulsar/pulsar:3.3.1container_name: pulsar-inithostname: pulsar-initnetworks:- pulsarcommand: >bin/pulsar initialize-cluster-metadata \--cluster cluster-a \--zookeeper zookeeper:2181 \--configuration-store zookeeper:2181 \--web-service-url http://broker:8080 \--broker-service-url pulsar://broker:6650depends_on:- zookeeper# Start Bookiebookie:image: apachepulsar/pulsar:3.3.1container_name: bookierestart: on-failurenetworks:- pulsarenvironment:- clusterName=cluster-a- zkServers=zookeeper:2181- metadataServiceUri=metadata-store:zk:zookeeper:2181- advertisedAddress=bookie- BOOKIE_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256mdepends_on:- zookeeper- pulsar-initvolumes:- ./data/bookkeeper:/pulsar/data/bookkeepercommand: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"# Start Brokerbroker:image: apachepulsar/pulsar:3.3.1container_name: brokerhostname: brokerrestart: on-failurenetworks:- pulsarenvironment:- metadataStoreUrl=zk:zookeeper:2181- zookeeperServers=zookeeper:2181- clusterName=cluster-a- managedLedgerDefaultEnsembleSize=1- managedLedgerDefaultWriteQuorum=1- managedLedgerDefaultAckQuorum=1- advertisedAddress=broker- advertisedListeners=external:pulsar://172.16.102.12:6650- PULSAR_MEM=-Xms1g -Xmx1g -XX:MaxDirectMemorySize=512mdepends_on:- zookeeper- bookieports:- "6650:6650"- "8080:8080"volumes:- ./custom-auth-broker.conf:/pulsar/conf/broker.conf- ./liderahenkpulsar.auth-1.0.jar:/pulsar/lib/liderahenkpulsar.auth-1.0.jarcommand: bash -c "bin/pulsar broker"
运行 Docker Compose 环境
运行 Docker Compose:使用 Docker Compose 启动具有自定义身份验证设置的 Pulsar 集群。
docker-compose up -d
验证设置:
- 检查日志以确保所有服务(Zookeeper、Bookie、Broker)正确启动。
- 确保代理加载自定义身份验证提供程序时不会出现错误。
设置 Spring Boot 项目
您可以使用 Spring Initializr 或具有以下依赖项的首选 IDE 创建新的 Spring Boot 项目:
- Spring Web:适用于 RESTful API(可选,根据您的用例)
- Spring Boot Starter:核心 Spring Boot 依赖项
- Pulsar 客户端:Java 版 Pulsar 客户端
添加依赖项
将 Pulsar 客户端和您的自定义身份验证 JAR 添加到 pom.xml
:
<dependencies><!-- Spring Boot Starter Dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Pulsar Client Dependency --><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>3.3.1</version></dependency><!-- Custom Authentication Dependency --><dependency><groupId>com.liderahenkpulsar</groupId><artifactId>liderahenkpulsar.auth</artifactId><version>1.0</version><scope>system</scope><systemPath>${project.basedir}/lib/liderahenkpulsar.auth-1.0.jar</systemPath></dependency>
</dependencies>
确保自定义 JAR 位于项目目录中的 lib
文件夹中。
使用自定义身份验证配置 Pulsar 客户端
为 Pulsar 客户端设置创建一个配置类:
package com.example.pulsarclient.config;import com.liderahenkpulsar.auth.AuthenticationBasicAuth;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class PulsarConfig {@Beanpublic PulsarClient pulsarClient() throws Exception {// Replace with the correct broker service URLString serviceUrl = "pulsar://localhost:6650";String userId = "your-username"; // Replace with your actual usernameString password = "your-password"; // Replace with your actual password// Create a Pulsar client with custom authenticationreturn PulsarClient.builder().serviceUrl(serviceUrl).authentication(new AuthenticationBasicAuth(userId, password)).build();}
}
当您运行应用程序时,您会看到 broker 的日志
2024–09–11T06:38:20,515+0000 [pulsar-io-3–7] INFO com.liderahenkpulsar.auth.BasicAuthProvider — Authentication request to endpoint: [http://172.16.102.215:8083/pulsar/send](http://172.16.102.215:8083/pulsar/send)
2024–09–11T06:38:20,515+0000 [pulsar-io-3–7] INFO com.liderahenkpulsar.auth.BasicAuthProvider — Authorization header: Basic testUser:testPass
2024–09–11T06:38:20,519+0000 [pulsar-io-3–7] INFO com.liderahenkpulsar.auth.BasicAuthProvider — Authentication successful: test
恭喜您可以通过自定义身份验证访问 pulsar。
原文地址