配置文件
kafka-config:bootstrap-servers: server-url #写自己的地址sasl-username: ********sasl-password: ********ssl-truststore: src/main/resources/only.4096.client.truststore.jks## sasl路径,demo中有,请拷贝到自己的某个目录下,不能被打包到jar中login-config: src/main/resources/kafka_client_jaas.confsasl-mechanism: PLAIN #两种加密方式: PLAIN, SCRAM-SHA-256# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka: topic: mytopic: user_info
读取配置,写consumer创建入参
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaProperties {@Value("${kafka-config.bootstrap-servers}")private String server;@Value("${kafka-config.sasl-username}")private String saslUsername;@Value("${kafka-config.sasl-password}")private String saslPassword;@Value("${kafka-config.ssl-truststore}")private String sslTruststore;@Value("${env}")public Properties getProperties(String topic) {Properties props = new Properties();// 设置接入点,请通过控制台获取对应Topic的接入点props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// 属于同一个组的消费实例,会负载消费消息props.put(ConsumerConfig.GROUP_ID_CONFIG, topic);// 每次poll的最大数量// 注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);// 两次poll之间的最大允许间隔// 可更加实际拉去数据和客户的版本等设置此值,默认30sprops.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 设置单次拉取的量,走公网访问时,该参数会有较大影响props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 3200000);props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 3200000);//以下配置为走认证的配置项/****// 设置SSL根证书的路径,请记得将XXX修改为自己的路径// 与sasl路径类似,该文件也不能被打包到jar中props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststore);// 根证书store的密码,保持不变props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");// 接入协议,目前支持使用SASL_SSL协议接入props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");// 设置SASL账号// 两种加密方式: PLAIN, SCRAM-SHA-256String saslMechanism = "PLAIN";String username = saslUsername;String password = saslPassword;if (StringUtils.isNotBlank(saslUsername) && StringUtils.isNotBlank(saslPassword)) {String prefix = "org.apache.kafka.common.security.scram.ScramLoginModule";if ("PLAIN".equalsIgnoreCase(saslMechanism)) {prefix = "org.apache.kafka.common.security.plain.PlainLoginModule";}String jaasConfig = String.format("%s required username=\"%s\" password=\"%s\";", prefix, username,password);props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);}// SASL鉴权方式,保持不变props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);// hostname校验改成空props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");*/return props;}
}
消费者创建消息处理
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class KafkaMessageHandlerScheduler {@Resourceprivate KafkaPropertiesInit kafkaPropertiesInit;@Value("${kafka.topic.user_info}")private String userInfoTopic;@Scheduled(initialDelay = 10000, fixedRate = Long.MAX_VALUE)public void userInfoMsg() {new Thread(() -> {// 设置消费组订阅的Topic,可以订阅多个// 如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样List<String> subscribedTopics = new ArrayList<String>();// 如果需要订阅多个Topic,则在这里add进去即可// 每个Topic需要先在控制台进行创建subscribedTopics.add(topic);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertiesInit.getProperties(topic));consumer.subscribe(subscribedTopics);while (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);// 必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIGList<String> msgs = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {if (StringUtils.isNotBlank(record.value())) {msgs.add(record.value());}}if (ObjectUtils.isNotEmpty(msgs)) {/****************业务逻辑todo********************/consumer.commitAsync();}} catch (Exception e) {log.error("消息处理异常 error = {}", e);} finally {try {Thread.sleep(1000);} catch (Exception e) {}}}}).start();}/***** 消息处理* @param tableName* @param topic*/private void messageHandler(String tableName, String topic) {// 设置消费组订阅的Topic,可以订阅多个// 如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样List<String> subscribedTopics = new ArrayList<String>();// 如果需要订阅多个Topic,则在这里add进去即可// 每个Topic需要先在控制台进行创建subscribedTopics.add(topic);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertiesInit.getProperties(topic));consumer.subscribe(subscribedTopics);while (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);// 必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIGList<String> msgs = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {if (StringUtils.isNotBlank(record.value())) {msgs.add(record.value());}}if (ObjectUtils.isNotEmpty(msgs)) {kafkaMessageService.sgHandler(tableName, msgs);consumer.commitAsync();}} catch (Exception e) {log.error("消息处理异常 error = {}", e);} finally {try {Thread.sleep(1000);} catch (Exception e) {}}}}
}