kafka集群安装Raft 协议

​使用消息中间件,可以实现系统与系统之间的异步通信和无缝对接,也可用在模块之间的的异步通信,有效避免了同步阻塞IO。作为一个高吞吐量、可扩展、高可靠性的分布式消息系统,Kafka 能够胜任从简单的消息队列到复杂的流处理平台的多种角色。MQTT消息系统和kafka是当前最常用的消息中间件。

Kafka集群组件

  • Broker
    • Kafka 集群中的一个服务器实例,每个 Broker 负责存储一定数量的分区。
    • Broker 通过 Broker ID 进行标识。
  • Topic
    • 消息的逻辑分类,可以理解为消息的类别。
    • 每个 Topic 可以有多个分区(Partition)。
  • Partition
    • 一个 Topic 被分割成多个分区,每个分区是一个有序的消息序列。
    • 分区是 Kafka 中的基本并行单元,分布在不同的 Broker 上。
  • Producer
    • 负责向 Kafka 主题发布消息的客户端应用程序。
  • Consumer
    • 负责从 Kafka 主题消费消息的客户端应用程序。
  • Consumer Group
    • 一组消费者共同消费一个或多个主题的分区,每个分区只能被 Consumer Group 中的一个消费者消费。

分区和副本机制

  • Leader 和 Follower
    • 每个分区都有一个 Leader 和若干个 Follower。
    • Leader 处理所有的读写请求,Follower 复制 Leader 的数据。
    • 当 Leader 失效时,一个 Follower 会被选举为新的 Leader。
  • 副本(Replica)
    • 每个分区的副本数可以配置,副本分布在不同的 Broker 上,以提高容错性。
    • 副本机制保证了数据的冗余和高可用性。


复制机制:数据被复制到多个 Broker 上,提高了数据的可用性和持久性。
Leader选举:当 Leader 节点失效时,Controller 会从 ISR 中选举一个新的 Leader。
数据恢复:新加入的 Broker 或恢复的 Broker 会从其他副本同步数据,确保数据一致性。

Kafka 2.8 引入了不依赖 Zookeeper 的架构,通过新的 Raft 协议(KRaft)管理集群元数据。

实验操作系统是:ubuntu-24.04.1,kafka版本3.8.0,是裸机,在安装了java的系统,直接跳过步骤1.

1、系统基本软件安装配置

sudo cp /etc/apt/sources.list /etc/apt/sources.list.bat
sudo vim /etc/apt/sources.list.d/ubuntu.sources
# 选aliyun或中科大镜像之一
http://mirrors.aliyun.com/ubuntu/
http://mirrors.ustc.edu.cn/ubuntu/sudo apt-get -y update;
sudo apt-get -y upgrade;# 安装openssh-server配置远程登录
sudo apt-get install -y openssh-serversudo vi /etc/ssh/sshd_config
#添加
PermitRootLogin yessudo systemctl start ssh
sudo systemctl enable ssh# 安装中文字符
sudo apt install language-pack-zh-hans
sudo apt-get -y install fonts-liberation
sudo apt-get -y install libu2f-udev# 安装软件支持库和常用工具
sudo apt-get install -y gcc g++
sudo apt-get install -y make
sudo apt-get install -y curl
sudo apt-get install -y openjdk-8-jdk
sudo apt-get install -y subversion
sudo apt-get install -y git
sudo apt-get install -y python3
sudo apt-get install -y python3-pip
sudo apt-get install -y p7zip-full p7zip-rar
sudo apt-get install -y zip unzip

2、安装配置kafka

#创建安装目录 
mkdir -p /opt/kafka#进入安装目录
cd /opt/kafka/#下载安装包、或上传
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz#解压
tar -zxvf kafka_2.13-3.8.0.tgz #重命名解压的目录
mv kafka_2.13-3.8.0 kafka3.8 #进入解压目录
cd kafka3.8/#重新命名服务器节点hostname 
vi /etc/hostname
# 设置你的机器域名,如 server1
hostname server1#查看hostname
hostname# 修改配置文件,存放数据的目录:log.dirs=/opt/kafka/kafkadata/ 
# 自己设定,按i输入,修改后按esc退出修改模式,按shift+冒号后输入x保存退出编辑。
vi config/kraft/server.properties# 如果没有安装java,先装,
sudo apt-get install -y openjdk-8-jdk# 生成 UUID作为集群id,这里用临时变量KAFKA_CLUSTER_ID存储这个id: 
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"# 格式化数据目录(-c指定配置文件,-t指定集群id):
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server. Properties#格式化成功后,会根据 config/kraft/server.properties 的配置:log.dirs=/opt/kafka/kafkadata/
# 生成 bootstrap.checkpoint,meta.properties  里面记录着KAFKA_CLUSTER_IDls /opt/kafka/kafkadata/
bootstrap.checkpoint  meta.properties # 单节点安装完成。下面验证测试服务是否安装成功:# 启动服务 
bin/kafka-server-start.sh config/kraft/server.properties# 创建主题 testtopic
bin/kafka-topics.sh --create --topic testtopic --bootstrap-server server1:9092# 查看主题信息
bin/kafka-topics.sh --describe --topic testtopic --bootstrap-server server1:9092
bin/kafka-topics.sh --list --bootstrap-server server1:9092# 生产者,打开新终端会话发送消息
bin/kafka-console-producer.sh --topic testtopic --bootstrap-server server1:9092 # 消费者 打开另一个终端会话,订阅主题 testtopic
bin/kafka-console-consumer.sh --topic testtopic --from-beginning --bootstrap-server server1:9092

3、服务配置和集群配置

kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。

SASL:是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
SSL: 是一种加密协议,它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密。这里配置了kafka安全认证sasl。

配置文件 /opt/kafka/kafka3.8/config/kraft/server.properties 修改或添加如下配置
节点id,用户,密码,目录,ip和端口等信息,需要根据实际修改: 

#角色
process.roles=broker,controller
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@192.168.2.212:9093,2@192.168.2.211:9093
listeners=SASL_PLAINTEXT://192.168.2.212:9092,CONTROLLER://192.168.2.212:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=SASL_PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
advertised.listeners=SASL_PLAINTEXT://192.168.2.212:9092
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#CONTROLLER:SASL_PLAINTEXT需要修改
listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 设置必须授权才能用
allow.everyone.if.no.acl.found=false
# 认证方式,用了最简单的PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
sasl.mechanism=PLAIN
# 禁用了自动创建topic
auto.create.topics.enable = false
# 设置必须授权才能用
allow.everyone.if.no.acl.found=false
# 设置超级管理员
super.users=User:admin
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 集群间认证时用的认证方式
sasl.mechanism.controller.protocol=PLAIN
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/opt/kafka/kafkadata/
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

配置kafka安全认证sasl
 
在目录 /opt/kafka/kafka3.8/config/kraft下,
创建kafka-server-sasl.properties配置文件,内容如下:

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="password"user_admin="password"user_test="test";
};

kafkaserver中定义了kafka的访问用户名密码 user_admin 必须和username password一致 
特别注意分号的位置,不多不少。

 修改kafka-server-start.sh,加入sasl认证,本机在/opt/kafka/kafka3.8/bin目录下。

加入环境变量:

-Djava.security.auth.login.config=/opt/kafka/kafka3.8/kafka/config/kafka-server-sasl.properties

注意:kafka-server-sasl.properties是前面添加的文件,根据自己的情况修改路径。也可以加到系统环境变量里面。

其他节点的安装,基本上是重复相同的内容,不同的是server.properties里面几处配置项,节点id和节点ip端口不同而已。

另外,集群id只要一个节点初始化即可,其他节点初始化存储目录时指定同一个集群id就可,这个集群id记录在一个元数据文件:

# 格式化数据目录(-c指定配置文件,-t指定集群id):
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server. Properties
添加节点就这样加入就行了。

附:

设置系统字符编码

修改编码
1、安装相关工具
sudo apt-get update
sudo apt-get install locales2、安装 UTF-8 语言支持包
sudo locale-gen en_US.UTF-83、安装完成后,修改 /etc/default/locale 文件,将原内容清空,并输入以下内容
LANG="en_US.UTF-8"
LANGUAGE="en_US.UTF-8:"4、修改 /etc/bash.bashrc 文件,在文件的最后添加以下内容:
export LANG="en_US.UTF-8"
export LANGUAGE="en_US.UTF-8:"5、运行以下命令更新环境:
source /etc/bash.bashrc需要设置系统语言环境为中文的:
sudo update-locale LANG=zh_CN.UTF-8
sudo locale-gen

ssl生成证书的脚本:gencert.sh 
变量BASE_DIR,yourpassword,域名, 根据情况修改。域名需要有效域名才行。

echo "Step1: init paramters"
# your own BASE_DIR
BASE_DIR=/opt/kafka/kafka3.8/ssl
CERT_OUTPUT_PATH="$BASE_DIR/cert"
PASSWORD=yourpassword
KEY_STORE="$CERT_OUTPUT_PATH/kafka.keystore"
TRUST_STORE="$CERT_OUTPUT_PATH/kafka.truststore"
KEY_PASSWORD=$PASSWORD
STORE_PASSWORD=$PASSWORD
TRUST_KEY_PASSWORD=$PASSWORD
TRUST_STORE_PASSWORD=$PASSWORD
CLUSTER_NAME=test-cluster-01
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert"
CLUSTER_CERT_FILE="$CERT_OUTPUT_PATH/${CLUSTER_NAME}-cert"
DAYS_VALID=365
D_NAME="CN=server1, OU=yykj, O=yykj, L=China, ST=China, C=bj"
mkdir -p $CERT_OUTPUT_PATHecho "Step2: Create certificate to keystore"
keytool -keystore $KEY_STORE -alias $CLUSTER_NAME -validity $DAYS_VALID -genkey -keyalg RSA -storepass $STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$D_NAME"echo "Step3: Create CA"
openssl req -new -x509 -keyout "$CERT_OUTPUT_PATH/ca-key" -out "$CERT_AUTH_FILE" -days "$DAYS_VALID" -passin pass:"$PASSWORD" -passout pass:"$PASSWORD" -subj "/C=CN/ST=XX/L=XX/O=XX/CN=XX"echo "Step4: Import CA into truststore"
keytool -keystore "$TRUST_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" -keypass "$TRUST_KEY_PASS" -nopromptecho "Step5: Export certificate from keystore"
keytool -keystore "$KEY_STORE" -alias "$CLUSTER_NAME" -certreq -file "$CLUSTER_CERT_FILE" -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -nopromptecho "Step6: Signing the certificate"
openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CLUSTER_CERT_FILE" -out "${CLUSTER_CERT_FILE}-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"echo "Setp7: Import CA into keystore"
keytool -keystore "$KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -nopromptecho "Setp8: Import signed certificate into keystore"
keytool -keystore "$KEY_STORE" -alias "${CLUSTER_NAME}" -import -file "${CLUSTER_CERT_FILE}-signed" -storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

 修改脚本文件为可执行:
chmod 777 gencert.sh
执行脚本生成证书
./gencert.sh 

ssl参考配置:

host.name=server1 
listeners=SASL_SSL://:8066,CONTROLLER://:9093
advertised.listeners=SASL_SSL://192.168.2.212:8066  
ssl.keystore.location=/opt/kafka/kafka3.8/ssl/cert/kafka.keystore
ssl.truststore.location=/opt/kafka/kafka3.8/ssl/cert/kafka.truststore
ssl.keystore.password=yourpassword
ssl.key.password=yourpassword
ssl.truststore.password=yourpassword
ssl.endpoint.identification.algorithm=
#inter.broker.listener.name=PLAINTEXT         
security.inter.broker.protocol=SASL_SSL
ssl.client.auth=required  
super.users=User:admin
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=true

springboot中集成kafka需要的包:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.1</version></dependency>

参考maven配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.1</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.cloud</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>demo</description><properties><java.version>21</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.36</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

获取classpath路径:

String base = Thread.currentThread().getContextClassLoader().getResource("").toString().substring(6);
application.yml中kafka配置
server:port: 8888
spring:kafka:# 指定kafka 代理地址,可以多个bootstrap-servers: 192.168.2.212:9092,192.168.2.211:9092producer: # 生产者retries: 2 # 设置大于0的值,则客户端会将发送失败的记录重新发送# 每次批量发送消息的数量batch-size: 16384acks: alltransaction-id-prefix: kafkaTx-buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer#修改最大向kafka推送消息大小properties:max.request.size: 52428800consumer:#手动提交offset保证数据一定被消费enable-auto-commit: false#指定从最近地方开始消费(earliest)auto-offset-reset: latest#消费者组group-id: c-id-1234444334#一次调用 poll() 操作时返回的最大记录数 默认为 500 条max-poll-records: 2#如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔auto-commit-interval: 5000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000properties:session:timeout:# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10sms: 10000max:poll:interval:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancems: 600000security:protocol: SASL_PLAINTEXTsasl:mechanism: PLAINjaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="password";'

config类,注意不需要ssl协议的,请去掉相关变量

生产者

package com.example.demo;import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;@SpringBootConfiguration
public class KafkaProviderConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.transaction-id-prefix}")private String transactionIdPrefix;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.buffer-memory}")private String bufferMemory;@Value("${spring.kafka.admin.security.protocol}")private String securityProtocol;@Value("${spring.kafka.admin.ssl.key-store-location}")private String keystoreLocation;@Value("${spring.kafka.admin.ssl.key-store-password}")private String keystorePassword;@Value("${spring.kafka.admin.ssl.trust-store-location}")private String truststoreLocation;@Value("${spring.kafka.admin.ssl.trust-store-password}")private String truststorePassword;@Value("${spring.kafka.ssl.endpoint.identification.algorithm}")private String endpointIdentificationAlgorithm;@Value("${spring.kafka.admin.ssl.key-password}")private String keyPassword;@Value("${spring.kafka.properties.sasl.mechanism}")private String saslMechanism;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>(16);// Thread.currentThread().getContextClassLoader().getResource("/").getPath().substring(1).replace("/", java.io.File.separator);String base = Thread.currentThread().getContextClassLoader().getResource("").toString().substring(6);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。//acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。//acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。//开启事务必须设为allprops.put(ProducerConfig.ACKS_CONFIG, acks);//发生错误后,消息重发的次数,开启事务必须大于0props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,//即使数据没达到16KB,也将这个批次发送出去props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");//生产者内存缓冲区的大小props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);//反序列化,和生产者的序列化方式对应props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);/*props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,securityProtocol);props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,base+ keystoreLocation);props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,keystorePassword);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,base+ truststoreLocation);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,truststorePassword);props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS");props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS");props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,keyPassword);props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,endpointIdentificationAlgorithm);*/props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);props.put(SaslConfigs.SASL_JAAS_CONFIG,jaasConfig);return props;}@Beanpublic ProducerFactory<Object, Object> producerFactory() {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());//开启事务,会导致 LINGER_MS_CONFIG 配置失效factory.setTransactionIdPrefix(transactionIdPrefix);return factory;}@Beanpublic KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic KafkaTemplate<Object, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

消费者

package com.example.demo;import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;@SpringBootConfiguration
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.properties.session.timeout.ms}")private String sessionTimeout;@Value("${spring.kafka.properties.max.poll.interval.ms}")private String maxPollIntervalTime;@Value("${spring.kafka.listener.concurrency}")private Integer concurrency;@Value("${spring.kafka.listener.missing-topics-fatal}")private boolean missingTopicsFatal;@Value("${spring.kafka.listener.poll-timeout}")private long pollTimeout;@Value("${spring.kafka.admin.security.protocol}")private String securityProtocol;@Value("${spring.kafka.admin.ssl.key-store-location}")private String keystoreLocation;@Value("${spring.kafka.admin.ssl.key-store-password}")private String keystorePassword;@Value("${spring.kafka.admin.ssl.trust-store-location}")private String truststoreLocation;@Value("${spring.kafka.admin.ssl.trust-store-password}")private String truststorePassword;@Value("${spring.kafka.ssl.endpoint.identification.algorithm}")private String endpointIdentificationAlgorithm;@Value("${spring.kafka.admin.ssl.key-password}")private String keyPassword;@Value("${spring.kafka.properties.sasl.mechanism}")private String saslMechanism;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>(16);// 需要去掉路径前缀// Thread.currentThread().getContextClassLoader().getResource("/").getPath().substring(1).replace("/", java.io.File.separator);String base = Thread.currentThread().getContextClassLoader().getResource("").toString().substring(6);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//自动提交的时间间隔,自动提交开启时生效props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);//两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalanceprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10sprops.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);/*props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,securityProtocol);props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,base+ keystoreLocation);props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,keystorePassword);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,base+ truststoreLocation);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,truststorePassword);props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS");props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS");props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,keyPassword);props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,endpointIdentificationAlgorithm);*/props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);props.put(SaslConfigs.SASL_JAAS_CONFIG,jaasConfig);return props;}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {//配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要try(JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {deserializer.trustedPackages("*");return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);}}@Bean(name="containerFactory1")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//在侦听器容器中运行的线程数,一般设置为 机器数*分区数factory.setConcurrency(concurrency);//消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误factory.setMissingTopicsFatal(missingTopicsFatal);//自动提交关闭,需要设置手动消息确认factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setPollTimeout(pollTimeout);//设置为批量监听,需要用List接收//factory.setBatchListener(true);return factory;}
}

发送消息,注意需要springboot-web依赖,看前面的maven依赖:

package com.example.demo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/kafka")
//这个注解代表这个类开启Springboot事务
@Transactional(rollbackFor = RuntimeException.class)
public class KafkaController {@Autowiredprivate KafkaTemplate<Object, Object> kafkaTemplate;@RequestMapping("/sendMultiple")public void sendMultiple() {for (int i = 0;i < 10;i++) {kafkaTemplate.send("testtopic1", "你好 world " + (i+1));}}@RequestMapping("/send")public String send() {kafkaTemplate.send("testtopic1", "你好 world.。");return "send ok";}
}
package com.example.demo;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;@Component
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {System.out.println("消息发送成功:" + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage());}
}

消费消息

package com.example.demo;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaHandler {@KafkaListener(groupId = "c-id-1234444334", topics = "testtopic1")//这里写业务逻辑public void listen(String message) {System.out.println("收到消息: "+message);}
}

异常处理

package com.example.demo;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class KafkaErrorHandler implements KafkaListenerErrorHandler {@Override@NonNullpublic Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception) {return new Object();}@Override@NonNullpublic Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/68507.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 接口安全指南

Java 接口安全指南 概述 在现代 Web 应用中&#xff0c;接口&#xff08;API&#xff09;是前后端交互的核心。然而&#xff0c;接口的安全性常常被忽视&#xff0c;导致数据泄露、未授权访问等安全问题。本文将详细介绍 Java 中如何保障接口安全&#xff0c;涵盖以下内容&am…

华为AI培训-NLP实验

中文分词、命名实体识别、语义词性标注、语句逻辑推理、文本摘要、机器翻译、文本情感分析、内容创作 1 实验介绍 1.1 实验背景 中文分词、命名实体识别、语义词性标注、语句逻辑推理是自然语言处理领域中的重要任务。中文分词是将连续的汉字序列切分成有意义的词语序列…

Flask学习入门笔记

Flask学习入门笔记 前言1. 安装Flask2. 创建一个简单的Flask应用3. 路由与视图函数3.1 基本路由3.2 动态路由3.3 HTTP方法 4. 请求与响应4.1 获取请求数据4.2 返回响应 5. 模板渲染5.1 基本模板渲染5.2 模板继承 6. 静态文件6.1 静态文件的目录结构6.2 在模板中引用静态文件6.2…

citrix netscaler13.1 重写负载均衡响应头(基础版)

在 Citrix NetScaler 13.1 中&#xff0c;Rewrite Actions 用于对负载均衡响应进行修改&#xff0c;包括替换、删除和插入 HTTP 响应头。这些操作可以通过自定义策略来完成&#xff0c;帮助你根据需求调整请求内容。以下是三种常见的操作&#xff1a; 1. Replace (替换响应头)…

【Web】2025西湖论剑·中国杭州网络安全安全技能大赛题解(全)

目录 Rank-l Rank-U sqli or not Rank-l username存在报错回显&#xff0c;发现可以打SSTI 本地起一个服务&#xff0c;折半查找fuzz黑名单&#xff0c;不断扔给fenjing去迭代改payload from flask import Flask, request, render_template_stringapp Flask(__name__)app…

【C】PAT 1011-1015

1011 AB和C 给定区间 [−231,231] 内的 3 个整数 A、B 和 C&#xff0c;请判断 AB 是否大于 C。 输入格式&#xff1a; 输入第 1 行给出正整数 T (≤10)&#xff0c;是测试用例的个数。随后给出 T 组测试用例&#xff0c;每组占一行&#xff0c;顺序给出 A、B 和 C。整数间以…

WEB渗透技术研究与安全防御

目录 作品简介I IntroductionII 1 网络面临的主要威胁1 1.1 技术安全1 2 分析Web渗透技术2 2.1 Web渗透技术的概念2 2.2 Web漏洞产生的原因2 2.3 注入测试3 2.3.1 注入测试的攻击流程3 2.3.2 进行一次完整的Sql注入测试4 2.3.3 Cookie注入攻击11 3 安全防御方案设计…

软考高级5个资格、中级常考4个资格简介及难易程度排序

一、软考高级5个资格 01、网络规划设计师 资格简介&#xff1a;网络规划设计师要求考生具备全面的网络规划、设计、部署和管理能力&#xff1b;该资格考试适合那些在网络规划和设计方面具有较好理论基础和较丰富从业经验的人员参加。 02、系统分析师 资格简介&#xff1a;系统分…

Centos 宝塔安装

yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh 安装成功界面 宝塔说明文档 https://www.bt.cn/admin/servers#wcu 或者可以注册宝塔账号 1 快速部署 安装docker 之后 2 需要在usr/bin下下载do…

ubuntu下安装编译cmake,grpc与protobuf

文章目录 install cmakeinstall grpcinstall protobuf注 install cmake sudo apt-get install -y g make libssl-devcd third_party/cmake-3.17.2./configuresudo make && make installcmake --version install grpc $ sudo apt-get install -y build-essential auto…

Java锁 从乐观锁和悲观锁开始讲 面试复盘

目录 面试复盘 Java 中的锁 大全 悲观锁 专业解释 自我理解 乐观锁 专业解释 自我理解 悲观锁的调用 乐观锁的调用 synchronized和 ReentrantLock的区别 相同点 区别 详细对比 总结 面试复盘 Java 中的锁 大全 悲观锁 专业解释 适合写操作多的场景 先加锁可以…

二十三种设计模式-装饰器模式

一、定义与核心思想 装饰器模式是一种结构型设计模式&#xff0c;其核心思想是动态地给一个对象添加一些额外的职责。通过这种方式&#xff0c;可以在不改变原有对象结构的基础上&#xff0c;灵活地增加新的功能&#xff0c;使得对象的行为可以得到扩展&#xff0c;同时又保持…

使用nginx搭建通用的图片代理服务器,支持http/https/重定向式图片地址

从http切换至https 许多不同ip的图片地址需要统一进行代理 部分图片地址是重定向地址 nginx配置 主站地址&#xff1a;https://192.168.123.100/ 主站nginx配置 server {listen 443 ssl;server_name localhost;#ssl证书ssl_certificate ../ssl/ca.crt; #私钥文件ssl_ce…

2.1.1 常用ST编程工具介绍

在工业自动化领域,ST语言(Structured Text)是IEC 61131-3标准中的一种重要编程语言。为了高效地编写、调试和运行ST程序,开发者通常依赖于一些专业的编程工具。以下是阿凡为大家介绍的几种常用的ST编程工具及其特点: 1. Codesys 概述 全称:Controller Development Syste…

latin1_swedish_ci(latin1 不支持存储中文、日文、韩文等多字节字符)

文章目录 1、SHOW TABLE STATUS WHERE Name batch_version;2、latin1_swedish_ci使用场景注意事项修改字符集和排序规则修改表的字符集和排序规则修改列的字符集和排序规则修改数据库的默认字符集和排序规则 3、ALTER TABLE batch_version CONVERT TO CHARACTER SET utf8mb4 C…

复健第二天之[MoeCTF 2022]baby_file

打开题目在线环境可以看到&#xff1a; 感觉要用伪协议去求&#xff0c;但是我们并不知道flag的位置&#xff0c;这里我选择用dirsearch去扫一下&#xff1a; 最像的应该就是flag.php了 于是就构建payload&#xff1a; **?filephp://filter/convert.base64-encode/resource…

机器学习之SVD奇异值分解实现图片降维

SVD奇异值分解实现图片降维 目录 SVD奇异值分解实现图片降维1 SVD奇异值分解1.1 概念1.2 基本步骤1.2.1 矩阵分解1.2.2 选择奇异值1.2.3 重建矩阵1.2.4 降维结果 1.3 优缺点1.3.1 优点1.3.2 缺点 2 函数2.1 函数导入2.2 函数参数2.3 返回值2.4 通过 k 个奇异值降维 3 实际测试3…

PyTorch使用教程(6)一文讲清楚torch.nn和torch.nn.functional的区别

torch.nn 和 torch.nn.functional 在 PyTorch 中都是用于构建神经网络的重要组件&#xff0c;但它们在设计理念、使用方式和功能上存在一些显著的区别。以下是关于这两个模块的详细区别&#xff1a; 1. 继承方式与结构 torch.nn torch.nn 中的模块大多数是通过继承 torch.nn…

移动端布局 ---- 学习分享

响应式布局实现方法 主流的实现方案有两种: 通过rem \ vw/vh \ 等单位,实现在不同设备上显示相同比例进而实现适配. 响应式布局,通过媒体查询media 实现一套HTML配合多套CSS实现适配. 在学习移动端适配之前,还需要学习移动端适配原理: 移动端适配原理(Viewport) 了解VSCo…

cuda + cudnn安装

1.安装CUDA Toolkit 在设备管理器&#xff08;此电脑–右键–属性&#xff09;的显示适配器中可以查看自己的显卡型号&#xff0c;去下载对应的CUDA Toolkit 。或者输入以下命令查看Driver Version &#xff0c;cuda Version&#xff1a;12.2代表12.2版本以下兼容可以进行安装 …