文章目录
- 1、kafka部署:
- (1)先创建一个网络:
- (2)安装zookeeper,kafka依赖zookeeper所以需要先安装zookeeper:
- (3)安装Kafka:
-
- (4)部署kafka图形化管理工具,选择kafka-map或kafka-manager:
- kafka-map(推荐)
- kafka-manager(不好用,不推荐)
- 2、springboot集成kafka:
- (1)pom文件里引入kafka依赖
- (2)application.yml配置文件:
- (3)在你的项目里新建文件结构如下:
- 生产者,KafkaProducer.java文件内容如下:
- 消费者,KafkaConsumer.java文件内容如下:
- controller文件内容如下:
- (4)调用 /kafka/test 接口,打印结果:
1、kafka部署:
(1)先创建一个网络:
docker network create app-tier --driver bridge
(2)安装zookeeper,kafka依赖zookeeper所以需要先安装zookeeper:
docker run -d --name zookeeper-server --network app-tier -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest
(3)安装Kafka:
docker run -d --name kafka-server --network app-tier -p 9092:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.202.211:9092 bitnami/kafka:latest
参数解释:
ALLOW_PLAINTEXT_LISTENER
KAFKA_CFG_ZOOKEEPER_CONNECT
KAFKA_CFG_ADVERTISED_LISTENERS
(4)部署kafka图形化管理工具,选择kafka-map或kafka-manager:
kafka-map(推荐)
docker run -d --name kafka-map --network app-tier -p 9090:8080 -v /root/kafka-map/data:/usr/local/kafka-map/data -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --restart always dushixiang/kafka-map:latest访问地址:192.168.202.211:9090
账号密码:admin/admin
kafka-manager(不好用,不推荐)
docker run --name kafka-manager -d --network app-tier -p 9091:9000 -e ZK_HOSTS="zookeeper-server:2181" sheepkiller/kafka-manager访问地址:192.168.202.211:9091
2、springboot集成kafka:
(1)pom文件里引入kafka依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.1</version>
</dependency>
(2)application.yml配置文件:
spring:kafka:bootstrap-servers: 192.168.202.211:9092 consumer:group-id: my-group-id auto-offset-reset: earliest
(3)在你的项目里新建文件结构如下:
生产者,KafkaProducer.java文件内容如下:
package com.heurd.intellidigital.service.modular.data.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topicName, String message) {System.out.printf("topicName: %s, message: %s%n", topicName, message);kafkaTemplate.send(topicName, message);}}
消费者,KafkaConsumer.java文件内容如下:
package com.heurd.intellidigital.service.modular.data.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id") public void listen(String message) {System.out.println("Received message: " + message);}}
controller文件内容如下:
package com.heurd.intellidigital.service.modular.data.controller;import com.heurd.intellidigital.service.modular.data.kafka.consumer.KafkaConsumer;
import com.heurd.intellidigital.service.modular.data.kafka.producer.KafkaProducer;
import com.heurd.intellimes.core.annotion.BusinessLog;
import com.heurd.intellimes.core.enums.LogAnnotionOpTypeEnum;
import com.heurd.intellimes.core.pojo.response.ResponseData;
import com.heurd.intellimes.core.pojo.response.SuccessResponseData;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;
@RestController
@RequestMapping("/kafka")
public class KafkaController {@Resourceprivate KafkaProducer kafkaProducer;@Resourceprivate KafkaConsumer kafkaConsumer;@ApiOperation("kafka测试")@GetMapping(value = "/test")public ResponseData<Object> kafka() {String message = "Hello, Kafka!"; kafkaProducer.sendMessage("my-topic", message); return new SuccessResponseData<>(message);}}
(4)调用 /kafka/test 接口,打印结果:
生产者:topicName: my-topic, message: Hello, Kafka!消费者:Received message: Hello, Kafka!