目录
1.引入kafka依赖
2.在yml文件配置配置kafka连接
3.注入KafkaTemplate模版
4.创建kafka消息监听和消费端
5.搭建kafka集群
5.1 下载 kafka Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/downloads.html
5.2 在config目录下做相关配置
1.引入kafka依赖
<!-- Spring Boot Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2.在yml文件配置配置kafka连接
kafka:bootstrap-servers: localhost:9092,localhost:9093,localhost:9095,localhost:9096consumer:group-id: myGroupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
3.注入KafkaTemplate模版
@Configuration
public class KafkaConfig {@Autowiredprivate ProducerFactory producerFactory;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory);}/* @Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 添加其他配置...return new DefaultKafkaProducerFactory<>(configProps);}*/
}
4.创建kafka消息监听和消费端
package com.example.consumer.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {@KafkaListener(topics = "yourTopic", groupId = "myGroup")public void listen(String message) {System.out.println("Received Message in group 'myGroup': " + message);}
}
5.搭建kafka集群
5.1 下载 kafka Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/downloads.html
5.2 在config目录下做相关配置
zookeeper.properties相关配置
server.properties相关配置 ,端口默认是9092,如果需要配置特定端口,可以加port=9092
想搞几个集群就复制几个,并且修改zookeeper.propertie和server.properties的端口。
就像我配置的