文章目录
- 1. 添加Maven依赖
- 2. 配置与参数分离
- 3. 工具类度内容
- 4. Producer 消息生产者配置
- 5. Consumer 消息消费者配置
- 6. 使用注解监听消息
- 7. 请求测试
- 8. 测试结果
1. 添加Maven依赖
<!-- 添加spring-kafka支持 -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.2.RELEASE</version>
</dependency>
2. 配置与参数分离
使用kafka.properties文件形式,将配置与参数分离,方便管理。
kafka.properties文件如下:
################## kafkaListener Producer 发送端 ##################
# brokers 集群
kafka.producer.bootstrap.servers = 192.168.43.242:9092,192.168.43.134:9092,192.168.43.228:9092#发送端 id
kafka.producer.client.id = producerDemo#发送端确认模式
kafka.producer.acks = -1#发送失败重试次数
kafka.producer.retries = 3#批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送
kafka.producer.batch.size = 4096#与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至broker
kafka.producer.linger.ms = 10# 33554432 即32MB的批处理缓冲区
#kafka.producer.buffer.memory = 40960#默认 topic
kafka.producer.defaultTopic = testTopic#key 序列化
kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer#value 序列化
kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer################## kafkaListener Consumer 消费端 ###################消费端 brokers 集群
kafka.consumer.bootstrap.servers = 192.168.43.242:9092,192.168.43.134:9092,192.168.43.228:9092#消费者 group.id 组ID
kafka.consumer.group.id = test-group#消费者消费消息后,进行自动提交
kafka.consumer.enable.auto.commit = true#自动提交的频率(与 enable.auto.commit = true 属性配合使用)
kafka.consumer.auto.commit.interval.ms = 1000#新的groupid,是否从头开始消费
kafka.consumer.auto.offset.reset = earliest#在使用kafka组管理时,发送心跳机制,用于检测消费者故障的超时
#kafka.consumer.session.timeout.ms = 1000#key 反序列化
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer#value 反序列化
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer#消费端消费的topic
kafka.consumer.topic = testTopic
3. 工具类度内容
添加 PropertiesUtils 工具类,来读取 properties文件内容
package com.demo.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;public class PropertiesUtils {private static Logger log = LoggerFactory.getLogger(PropertiesUtils.class);/*** 根据文件名获取Properties对象* @param fileName* @return*/public static Properties read(String fileName){InputStream in = null;try{Properties prop = new Properties();//InputStream in = Object.class.getResourceAsStream("/"+fileName);in = PropertiesUtils.class.getClassLoader().getResourceAsStream(fileName);if(in == null){return null;}prop.load(in);return prop;}catch(Exception e){e.printStackTrace();}finally{try {if(in != null){in.close();}} catch (IOException e) {e.printStackTrace();}}return null;}/*** 根据文件名和键名获取值* @param fileName* @param key* @return*/public static String readKeyValue(String fileName, String key){Properties prop = read(fileName);if(prop != null){return prop.getProperty(key);}return null;}/*** 根据键名获取值* @param prop* @param key* @return*/public static String readKeyValue(Properties prop, String key){if(prop != null){return prop.getProperty(key);}return null;}/*** 写入* @param fileName* @param key* @param value*/public static void writeValueByKey(String fileName, String key, String value){Map<String, String> properties = new HashMap<String, String>();properties.put(key, value);writeValues(fileName, properties);}/*** 写入* @param fileName* @param properties*/public static void writeValues(String fileName, Map<String, String> properties){InputStream in = null;OutputStream out = null;try {in = PropertiesUtils.class.getClassLoader().getResourceAsStream(fileName);if(in == null){throw new RuntimeException("读取的文件("+fileName+")不存在,请确认!"); }Properties prop = new Properties();prop.load(in);String path = PropertiesUtils.class.getResource("/"+fileName).getPath();out = new FileOutputStream(path);if(properties != null){Set<String> set = properties.keySet();for (String string : set) {prop.setProperty(string, properties.get(string));log.info("更新"+fileName+"的键("+string+")值为:"+properties.get(string));}}prop.store(out, "update properties");} catch (Exception e) {e.printStackTrace();} finally{try {if(in != null){in.close();}if(out != null){out.flush();out.close();}} catch (Exception e2) {e2.printStackTrace();}}}public static void main(String[] args) throws Exception {//System.out.println("read="+read("config.properties"));//System.out.println("readKeyValue="+readKeyValue("config.properties","superAdmin"));//writeValueByKey(CC.WEIXI_PROPERTIES, "access_token", "ddd");Map<String, String> properties = new HashMap<String, String>();properties.put("access_token", "ddd2");properties.put("access_token1", "ee2");properties.put("bbbb", "bbbb");}
}
4. Producer 消息生产者配置
package com.demo.kafka;import com.demo.utils.PropertiesUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** Kafka 消息生产者配置*/
@Configurable
@Component
@EnableKafka
public class KafkaProducerConfig {Properties properties = PropertiesUtils.read("kafka.properties");public KafkaProducerConfig() {System.out.println("kafka 生产者配置加载...");}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory(producerProperties());}public Map<String, Object> producerProperties() {Map<String, Object> props = new HashMap<String, Object>();//Kafka服务地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.producer.bootstrap.servers"));//设置当前客户端idprops.put(ProducerConfig.CLIENT_ID_CONFIG, properties.getProperty("kafka.producer.client.id"));//设置消费端确认机制props.put(ProducerConfig.ACKS_CONFIG, properties.getProperty("kafka.producer.acks"));//发送失败重试次数props.put(ProducerConfig.RETRIES_CONFIG, properties.getProperty("kafka.producer.retries"));//批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送props.put(ProducerConfig.BATCH_SIZE_CONFIG, properties.getProperty("kafka.producer.batch.size"));//与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至brokerprops.put(ProducerConfig.LINGER_MS_CONFIG,properties.getProperty("kafka.producer.linger.ms"));//Key序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.producer.key.serializer"));//Value序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.producer.value.serializer"));return props;}@Beanpublic KafkaTemplate<String,String> kafkaTemplate(){KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory(),true);//设置默认的topic(此处可做一些具体设置)kafkaTemplate.setDefaultTopic(properties.getProperty("kafka.producer.defaultTopic"));return kafkaTemplate;}
}
5. Consumer 消息消费者配置
package com.demo.kafka;import com.demo.utils.PropertiesUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@Configurable
@Component
@EnableKafka
public class KafkaConsumerConfig {Properties properties = PropertiesUtils.read("kafka.properties");public KafkaConsumerConfig() {System.out.println("kafka消费者配置加载...");}public Map<String, Object> consumerProperties() {Map<String, Object> props = new HashMap<String, Object>();//Kafka服务地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.consumer.bootstrap.servers"));//消费组props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getProperty("kafka.consumer.group.id"));//设置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.getProperty("kafka.consumer.enable.auto.commit"));//设置间隔时间props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getProperty("kafka.consumer.auto.commit.interval.ms"));//Key反序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.consumer.key.deserializer"));//Value反序列化props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.consumer.value.deserializer"));//从头开始消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getProperty("kafka.consumer.auto.offset.reset"));return props;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic KafkaConsumerListener kafkaConsumerListener() {return new KafkaConsumerListener();}
}
6. 使用注解监听消息
通过 @KafkaListener 注解的形式,消费topic中的消息
public class KafkaConsumerListener {@KafkaListener(topics = "testTopic01")public void listen01(ConsumerRecord<String,String> consumerRecord){System.out.println("开始消费testTopic01的消息");System.out.println("消费者线程:"+Thread.currentThread().getName()+"[ 消息 来自kafkatopic:"+consumerRecord.topic()+",分区:"+consumerRecord.partition() +" ,委托时间:"+consumerRecord.timestamp()+"]消息内容如下:");System.out.println(consumerRecord.value());}@KafkaListener(topics = "testTopic02")public void listen02(ConsumerRecord<String,String> consumerRecord){System.out.println("开始消费testTopic02的消息");System.out.println(consumerRecord.value());}/*** 消费 某个topic 下指定分区的消息*/@KafkaListener(topicPartitions = {@TopicPartition(topic = "liuzebiao",partitions = {"1"})})public void topicMessage(ConsumerRecord<?, ?> record,String content){System.out.println("消息:"+ content);System.out.println("消息被消费------>Topic:"+ record.topic() + ",------>Value:" + record.value() +",------>Partition:" + record.partition());}
}
7. 请求测试
通过Spring Controller的形式,开始测试
@Controller
@RequestMapping("kafka")
public class KafkaController {@AutowiredKafkaTemplate kafkaTemplate;/*** 消息发送*/@RequestMapping("producer")@ResponseBodypublic void producer(){kafkaTemplate.send("testTopic01","producer发送消息01");kafkaTemplate.send("testTopic02","producer发送消息02");}}
8. 测试结果
通过 localhost:8080/kafka/producer 调用接口,使用 kafkaTemplate 发送消息。
消息发送成功后,在控制台会收到如下信息:
开始消费testTopic01的消息
消费者线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1[ 消息 来自kafkatopic:testTopic01,分区:0 ,委托时间:1568107936693]消息内容如下:
producer发送消息01开始消费testTopic02的消息
producer发送消息02
项目链接:
https://github.com/gb-heima/spring-annotataion-kafka