官网:https://kafka.apache.org/documentation/
消息队列-场景
1. 异步
2. 解耦
3. 削峰
4. 缓冲
消息队列-Kafka
1. 消息模式
消息发布订阅模式,MessageQueue中的消息不删除,会记录消费者的偏移量
2. Kafka工作原理
同一个消费者组里的消费者是队列竞争模式:Consumer1消费Broker-0的news消息,Consumer2消费Broker-1的news消息,Consumer3消费Broker-2的news消息。如果有Consumer4,那他哪个分区都不能消费,就是消费的饥饿问题。
不同消费组中的消费者是发布/订阅模式:Consumer1和Consumer4都能消费0分区(Broker0)。
3. SpringBoot整合
参照:https://docs.spring.io/spring-kafka/docs/current/reference/html/#preface
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
配置
spring.kafka.bootstrap-servers=172.20.128.1:9092
修改C:\Windows\System32\drivers\etc\hosts
文件,配置8.130.32.70 kafka
4. 消息发送
kafkaTemplate发送消息的内容是对象时,需要用json序列化,在配置文件中加上:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
@SpringBootTest
class Boot07KafkaApplicationTests {@AutowiredKafkaTemplate kafkaTemplate;@Testvoid contextLoads() throws ExecutionException, InterruptedException {StopWatch watch = new StopWatch();watch.start();CompletableFuture[] futures = new CompletableFuture[10000];for (int i = 0; i < 10000; i++) {CompletableFuture send = kafkaTemplate.send("order", "order.create."+i, "订单创建了:"+i);futures[i]=send;}CompletableFuture.allOf(futures).join();watch.stop();System.out.println("总耗时:"+watch.getTotalTimeMillis());}
}
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class MyBean {private final KafkaTemplate<String, String> kafkaTemplate;public MyBean(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void someMethod() {this.kafkaTemplate.send("someTopic", "Hello");}}
5. 消息监听
@Component
public class OrderMsgListener {@KafkaListener(topics = "order",groupId = "order-service")public void listen(ConsumerRecord record){System.out.println("收到消息:"+record); //可以监听到发给kafka的新消息,以前的拿不到}@KafkaListener(groupId = "order-service-2",topicPartitions = {@TopicPartition(topic = "order",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "0")})})public void listenAll(ConsumerRecord record){System.out.println("收到partion-0消息:"+record);}
}
6. 参数配置
消费者
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
生产者
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
7. 自动配置原理
kafka 自动配置在KafkaAutoConfiguration
- 容器中放了
KafkaTemplate
可以进行消息收发 - 容器中放了
KafkaAdmin
可以进行 Kafka 的管理,比如创建 topic 等 - kafka 的配置在
KafkaProperties
中 @EnableKafka
可以开启基于注解的模式
toConfiguration`
- 容器中放了
KafkaTemplate
可以进行消息收发 - 容器中放了
KafkaAdmin
可以进行 Kafka 的管理,比如创建 topic 等 - kafka 的配置在
KafkaProperties
中 @EnableKafka
可以开启基于注解的模式