1. 安装
单机安装kafka
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
dockerhub网址: https://hub.docker.com
- Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
- Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:latest
docker pull bitnami/kafka:3.6.2 (用这个会有问题,因为创建容器时参数设置与wurstmeister/kafka不同)
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:latest
- 测试
终端窗口A
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181 (创建主题)
Created topic test.
bash-5.1# kafka-console-producer.sh --broker-list localhost:9092 --topic test (创建生产者)
>hello (发送消息)
>haha
终端窗口B
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning (创建接收者)
hello (收到了消息)
haha
- 安装kafka可视化工具(运行容器后打不开,不知道为啥)
docker run -d --name kafka-eagle -p 8048:8048 -e EFAK_CLUSTER_ZK_LIST="192.168.200.131:2181" nickzurich/efak:latest
集群安装
- kafka.yml
version: '3.8'
services:zookeeper:image: zookeeper:3.7.0restart: alwayshostname: 192.168.200.131container_name: zookeeperprivileged: trueports:- 2181:2181volumes:- /usr/local/server/zookeeper/data/:/databuild:context: .network: hostkafka1:container_name: kafka1restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9092:9092- 19092:19092environment:KAFKA_BROKER_ID: 1HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9092 ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9092KAFKA_PORT: 9092KAFKA_delete_topic_enable: 'true'KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19092"JMX_PORT: 19092volumes:/etc/localtime:/etc/localtimedepends_on:zookeeperkafka2:container_name: kafka2restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9093:9093- 19093:19093environment:KAFKA_BROKER_ID: 2HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9093 ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9093KAFKA_PORT: 9093KAFKA_delete_topic_enable: 'true'KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19093"JMX_PORT: 19093volumes:/etc/localtime:/etc/localtimedepends_on:zookeeperkafka3:container_name: kafka3restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9094:9094- 19094:19094environment:KAFKA_BROKER_ID: 3HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9094 ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9094KAFKA_PORT: 9094KAFKA_delete_topic_enable: 'true'KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19094"JMX_PORT: 19094volumes:/etc/localtime:/etc/localtimedepends_on:zookeepereagle:image: gui66497/kafka_eaglecontainer_name: eagle_monitorrestart: alwaysdepends_on:- kafka1- kafka2- kafka3ports:- "8048:8048"environment:ZKSERVER: "192.168.200.131:2181"
- 命令
docker-compose -f kafka.yml up -d
docker-compose -f kafka.yml down
docker-compose -f kafka.yml ps
[root@192 images]# ls
kafka.yml
[root@192 images]# docker-compose -f kafka.yml up -d
[+] Running 6/6⠿ Network images_default Created 0.1s⠿ Container kafka2 Started 1.0s⠿ Container kafka3 Started 1.0s⠿ Container zookeeper Started 1.0s⠿ Container kafka1 Started 1.0s⠿ Container eagle_monitor Started 1.5s
[root@192 images]#
// 但是还是用不了eagle,不知道为啥,防火墙是已经关了
2. springboot集成
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,这里采用这种方式
2.1 创建单点kafka和topic
[root@192 images]# docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
700f01ad38e99df4a8a7979a66cb88e6b629dccc29820c18dd3213ebc60c5814
[root@192 images]# docker run -d --name kafka \
> --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
> --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
> --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
> --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
> --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
> --net=host wurstmeister/kafka:latest
5884d54092ede091c2572e6420158529de29cf8e98da3706a572e1fa1408182e
[root@192 images]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic test.
bash-5.1# kafka-topics.sh --create --topic user-topic --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic user-topic.
2.2 创建生产者
dependencies
<!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>
application.yml
server:port: 8080
spring:application:name: kafka-producerkafka:bootstrap-servers: 192.168.200.131:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
controller-发送消息
@RestController
public class HelloController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("test","springboot发的第一条消息");return "ok";}@GetMapping("/helloUser")public String helloUser(){User user = new User();user.setName("xiaowang");user.setAge(18);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "ok";}
}
User
public class User {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}
}
2.3 创建消费者
dependencies
<!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>
application.yml
server:port: 8081
spring:application:name: kafka-consumerkafka:bootstrap-servers: 192.168.200.131:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
User
public class User {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}
}
消息监听器
@Component
public class HelloListener {@KafkaListener(topics = "test")public void onMessage1(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}@KafkaListener(topics = "user-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user = JSON.parseObject(message, User.class);System.out.println(user.toString());}}
}
启动生产者和消费者项目,浏览器输入http://127.0.0.1:8080/hello,发现消费者收到消息
浏览器输入http://127.0.0.1:8080/helloUser,发现消费者收到消息
项目结构
3.其它
通常在监听类直接调用service方法
@Component
@Slf4j
public class ArtilceIsDownListener {@Autowiredprivate ApArticleConfigService apArticleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}
}