Docker搭建kafka集群
kafka概念
broker:消息中间件处理节点,一个broker就是一个kafka节点,一个或者多个broker就组成了一个kafka集群 topic:kafka根据topic对消息进行归类,发布到kafka集群的每个消息,都要指定一个topic producer:消息生产者,向broker发送消息的客户端 consumer:消息消费者,从broker读取消息的客户端
kafka特性描述
生产者将消息发送给broker,broker会将消息保存在本地的日志文件中 消息的保存是有序的,通过offset偏移量来描述消息的有序性 消费者消费消息时,也是通过offset来描述当前要消费的那条消息的位置
消息相关
如果多个消费者在同一个消费者组中,那么只有一个消费者可以收到订阅topic中的消息,换言之,同一个消费组中只有一个消费者能收到一个topic中的消息 多播消息:不同的消费组订阅同一个topic,不同的消费组中只有一个消费者能收到消息,实际上也是多个消费组中的多个消费者收到了消息
Controller、Rebalance、HW
Controller
Kafka集群中的broker在zk中创建节点的时候,会有一个临时节点序号,序号最小的节点,会被当做集群的controller,负责管理集群中的所有分区和副本的状态 当某个分区的leader副本出现故障,由控制器负责为该分区选举新的leader副本 当检测到某个分区的ISR集合发生变化的时候,由控制器负责通知所有的broker更新其元数据信息 当使用kafka-topic.sh脚本为某个topic增加分区数量的时候,同样还是由控制器负责让新分区被其它节点感知到
Rebalance
前提是消费者没有指定分区进行消费,当消费组中的消费者或者分区关系发生变化的时候,就会触发rebalance机制,这个机制会调整消费者消费哪个分区 在触发rebalance机制之前,消费者消费哪个分区有三种策略: range:通过公示来计算某个消费者消费哪个分区 轮询:所有消费者轮着消费 sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整
HW和LEO
LEO是某个副本最后消息的消息位置(log-end-offset) HW是已完成同步的位置,消息在写入broker时,且每个broker都完成了这条消息的同步后,hw才会变化,这之前,消费者是消费不到这条消息的,同步完成后,HW调整后,消费者才能消费这条消息,这样做是为了方式消息丢失
kafka消息积压问题
消息积压问题的出现:消息的消费者的消费速度远远赶不上生产者生产消息的速度,导致kafka中有大量的数据没有被消费,随着没有被消费的消息越来越多,消费者寻址的性能越来越差,最后导致整个kafka对外提供的服务的性能越来越差,从而造成其它服务的访问速度很慢,造成服务雪崩。 消息积压的解决方案: 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息; 创建多个消费组,多个消费者,部署到其它机器上,一起消费,提高消费者消费消息的速度; 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者,该消费者poll下来的消息,直接转发到新的主题上,使用多个消费者消费新主题的消息–该方法不常用
Docker 搭建kafka集群
docker search kafka
docker pull bitnami/kafka
docker run -d --name kafka1 --network mynetwork \ -p 9092 :9092 \ --env KAFKA_BROKER_ID = 0 \ --env KAFKA_ZOOKEEPER_CONNECT = 192.168 .228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \ --env KAFKA_ADVERTISED_LISTENERS = PLAINTEXT://192.168.228.5:9092 \ --env KAFKA_LISTENERS = PLAINTEXT://0.0.0.0:9092 bitnami/kafka docker run -d --name kafka2 --network mynetwork \ -p 9093 :9092 \ --env KAFKA_BROKER_ID = 1 \ --env KAFKA_ZOOKEEPER_CONNECT = 192.168 .228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \ --env KAFKA_ADVERTISED_LISTENERS = PLAINTEXT://192.168.228.6:9092 \ --env KAFKA_LISTENERS = PLAINTEXT://0.0.0.0:9092 bitnami/kafka docker run -d --name kafka3 --network mynetwork \ -p 9094 :9092 \ --env KAFKA_BROKER_ID = 2 \ --env KAFKA_ZOOKEEPER_CONNECT = 192.168 .228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \ --env KAFKA_ADVERTISED_LISTENERS = PLAINTEXT://192.168.228.7:9092 \ --env KAFKA_LISTENERS = PLAINTEXT://0.0.0.0:9092 bitnami/kafka
docker start kafka1
docker start kafka2
docker start kafka3
springboot引用kafka的生产者和消费者
server : port : 8080 servlet : context-path : /
spring : application : name : mvcLearn
web : resources : static-locations : - classpath: /hwc/kafka : bootstrap-servers : 127.0.0.1: 9092 , 127.0.0.1: 9093 , 127.0.0.1: 9094 producer : acks : 1 retries : 3 batch-size : 16384 buffer-memory : 33554432 key-serializer : org.apache.kafka.common.serialization.StringSerializervalue-serializer : org.apache.kafka.common.serialization.StringSerializerconsumer : group-id : default- groupenable-auto-commit : false auto-offset-reset : earliestkey-deserializer : org.apache.kafka.common.serialization.StringDeserializervalue-deserializer : org.apache.kafka.common.serialization.StringDeserializerlistener : ack-mode : manual_immediate
package com. huwc. mvclearn. controller ; import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. web. bind. annotation. GetMapping ;
import org. springframework. web. bind. annotation. PathVariable ;
import org. springframework. web. bind. annotation. RestController ; @RestController
public class MyKafkaControlller { private final static String TOPIC_NAME = "my_two_partition_topic" ; @Autowired private KafkaTemplate < String , String > template ; @GetMapping ( "/send/{msg}" ) public String sendMessage ( @PathVariable ( "msg" ) String msg) { template. send ( TOPIC_NAME , 0 , "key" , msg) ; return "send success" ; }
}
package com. huwc. mvclearn. consumer ; import org. apache. kafka. clients. consumer. ConsumerRecord ;
import org. apache. kafka. clients. consumer. ConsumerRecords ;
import org. springframework. kafka. annotation. KafkaListener ;
import org. springframework. kafka. support. Acknowledgment ;
import org. springframework. stereotype. Component ; @Component
public class MyKafkaConsumer { @KafkaListener ( topics = "my_two_partition_topic" , groupId = "MyGroup1" ) public void listenGroup ( ConsumerRecord < String , String > record, Acknowledgment ack) { String key = record. key ( ) ; String value = record. value ( ) ; System . out. println ( "key = " + key) ; System . out. println ( "value = " + value) ; System . out. println ( "record = " + record) ; ack. acknowledge ( ) ; }
}