#拉取镜像
docker pull wurstmeister/zookeeper
#运行容器
docker run --restart=always --name zookeeper -p 2181:2181 \
-v /etc/localtime:/etc/localtime -d wurstmeister/zookeeper#拉取镜像
docker pull wurstmeister/kafka#运行容器
docker run --restart=always --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.56.1:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.56.1:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-v /etc/localtime:/etc/localtime -d wurstmeister/kafka# 这里将8088映射到容器的8080端口,因为ui默认是从8080启动的,否则不能访问UI界面
docker run --name=kafka-ui \
-e KAFKA_CLUSTERS_0_NAME=kafka-cluster-name \
-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=192.168.168.160:9092 \
-p 8088:8080 \
-d provectuslabs/kafka-ui:latest
SpringBoot整合Kafka
- 引入依赖
<!-- Kafka 相关依赖 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
- 配置文件:
在application.properties文件中添加Kafka的相关配置。
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: article-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
- 编写生产者(这是使用Kafka实现双写一致性的示例代码,更新完数据就将实体id发给Kafka)
@Overridepublic ResponseResult updateArticleByKafka(ApArticle article) {apArticleMapper.updateById(article);kafkaTemplate.send("article-events",article.getId()+"");return ResponseResult.okResult();}
- 编写消费者(在消费者中直接对这个id的数据进行删除缓存处理,再次查询这个数据自然会重建缓存)
@Component
@Slf4j
public class ArticleUpdateCacheKafkaConsumerListener {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@KafkaListener(topics = "article-events")public void receiveMessage(String message) {redisTemplate.delete(ARTICLE_KEY + message);log.debug("Received message: " + message);}}