kafka安装配置及集成springboot

1. 安装

单机安装kafka
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
dockerhub网址: https://hub.docker.com

  • Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:latest
docker pull bitnami/kafka:3.6.2 (用这个会有问题,因为创建容器时参数设置与wurstmeister/kafka不同)

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:latest
  • 测试
    终端窗口A
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181   (创建主题)
Created topic test.
bash-5.1# kafka-console-producer.sh --broker-list localhost:9092 --topic test   (创建生产者)
>hello    (发送消息)
>haha

终端窗口B

[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning   (创建接收者)
hello    (收到了消息)
haha
  • 安装kafka可视化工具(运行容器后打不开,不知道为啥)
docker run -d --name kafka-eagle -p 8048:8048 -e EFAK_CLUSTER_ZK_LIST="192.168.200.131:2181" nickzurich/efak:latest

集群安装

  1. kafka.yml
version: '3.8'
services:zookeeper:image: zookeeper:3.7.0restart: alwayshostname: 192.168.200.131container_name: zookeeperprivileged: trueports:- 2181:2181volumes:- /usr/local/server/zookeeper/data/:/databuild:context: .network: hostkafka1:container_name: kafka1restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9092:9092- 19092:19092environment:KAFKA_BROKER_ID: 1HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9092    ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9092KAFKA_PORT: 9092KAFKA_delete_topic_enable: 'true'KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19092"JMX_PORT: 19092volumes:/etc/localtime:/etc/localtimedepends_on:zookeeperkafka2:container_name: kafka2restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9093:9093- 19093:19093environment:KAFKA_BROKER_ID: 2HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9093    ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9093KAFKA_PORT: 9093KAFKA_delete_topic_enable: 'true'KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19093"JMX_PORT: 19093volumes:/etc/localtime:/etc/localtimedepends_on:zookeeperkafka3:container_name: kafka3restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9094:9094- 19094:19094environment:KAFKA_BROKER_ID: 3HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9094    ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9094KAFKA_PORT: 9094KAFKA_delete_topic_enable: 'true'KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19094"JMX_PORT: 19094volumes:/etc/localtime:/etc/localtimedepends_on:zookeepereagle:image: gui66497/kafka_eaglecontainer_name: eagle_monitorrestart: alwaysdepends_on:- kafka1- kafka2- kafka3ports:- "8048:8048"environment:ZKSERVER: "192.168.200.131:2181"
  1. 命令

docker-compose -f kafka.yml up -d
docker-compose -f kafka.yml down
docker-compose -f kafka.yml ps

[root@192 images]#  ls
kafka.yml
[root@192 images]# docker-compose -f kafka.yml up -d
[+] Running 6/6⠿ Network images_default   Created                                                                                        0.1s⠿ Container kafka2         Started                                                                                        1.0s⠿ Container kafka3         Started                                                                                        1.0s⠿ Container zookeeper      Started                                                                                        1.0s⠿ Container kafka1         Started                                                                                        1.0s⠿ Container eagle_monitor  Started                                                                                        1.5s
[root@192 images]# 

// 但是还是用不了eagle,不知道为啥,防火墙是已经关了

2. springboot集成

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,这里采用这种方式

2.1 创建单点kafka和topic

[root@192 images]# docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
700f01ad38e99df4a8a7979a66cb88e6b629dccc29820c18dd3213ebc60c5814
[root@192 images]# docker run -d --name kafka \
> --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
> --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
> --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
> --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
> --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
> --net=host wurstmeister/kafka:latest
5884d54092ede091c2572e6420158529de29cf8e98da3706a572e1fa1408182e
[root@192 images]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic test.
bash-5.1# kafka-topics.sh --create --topic user-topic --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic user-topic.

2.2 创建生产者

dependencies

<!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>

application.yml

server:port: 8080
spring:application:name: kafka-producerkafka:bootstrap-servers: 192.168.200.131:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

controller-发送消息

@RestController
public class HelloController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("test","springboot发的第一条消息");return "ok";}@GetMapping("/helloUser")public String helloUser(){User user = new User();user.setName("xiaowang");user.setAge(18);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "ok";}
}

User

public class User {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}
}

2.3 创建消费者

dependencies

<!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>

application.yml

server:port: 8081
spring:application:name: kafka-consumerkafka:bootstrap-servers: 192.168.200.131:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

User

public class User {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}
}

消息监听器

@Component
public class HelloListener {@KafkaListener(topics = "test")public void onMessage1(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}@KafkaListener(topics = "user-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user = JSON.parseObject(message, User.class);System.out.println(user.toString());}}
}

启动生产者和消费者项目,浏览器输入http://127.0.0.1:8080/hello,发现消费者收到消息
在这里插入图片描述
浏览器输入http://127.0.0.1:8080/helloUser,发现消费者收到消息
在这里插入图片描述
项目结构
在这里插入图片描述

3.其它

通常在监听类直接调用service方法

@Component
@Slf4j
public class ArtilceIsDownListener {@Autowiredprivate ApArticleConfigService apArticleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/836506.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker(五):DockerFile

文章目录 DockerFile1、Dockerfile构建过程解析2、DockerFile常用保留字命令FROMMAINTAINERRUNEXPOSEWORKDIRUSERENVADDCOPYVOLUMECMDENTRYPOINT总结 3、案例 DockerFile 1、Dockerfile构建过程解析 官网文档&#xff1a;https://docs.docker.com/reference/dockerfile/ Dock…

【论文阅读笔记】HermesSim(Code is not Natural Language) (Security 24)

个人博客地址 HermesSim [Security 24] 论文&#xff1a;《Code is not Natural Language: Unlock the Power of Semantics-Oriented Graph Representation for Binary Code Similarity Detection》 仓库&#xff1a;https://github.com/NSSL-SJTU/HermesSim 提出的问题 二…

JVM调优:JVM中的垃圾收集器详解

JVM&#xff08;Java Virtual Machine&#xff09;垃圾收集器是Java虚拟机中的一个重要组件&#xff0c;负责自动管理Java堆内存中的对象。垃圾收集器的主要任务是找出那些不再被程序使用的对象&#xff0c;并释放它们占用的内存&#xff0c;以便为新的对象分配空间。这个过程被…

C#泛型委托

在C#中&#xff0c;delegate 关键字用于声明委托&#xff08;delegates&#xff09;&#xff0c;委托是一种类型安全的函数指针&#xff0c;允许你传递方法作为参数或从方法返回方法。有时我们需要将一个函数作为另一个函数的参数&#xff0c;这时就要用到委托&#xff08;Dele…

算法题② —— 链表专栏

1. 链表数据结构 struct ListNode {int val;ListNode *next;ListNode() : val(0), next(nullptr) {}ListNode(int x) : val(x), next(nullptr) {}ListNode(int x, ListNode *next) : val(x), next(next) {}};2. 链表的删除 2.1 移除链表元素 力扣&#xff1a;https://leetco…

引擎:主程渲染

一、引擎发展 二、引擎使用 1.游戏渲染流程 2.3D场景编辑器操作与快捷键 3.节点的脚本组件 脚本介绍 引擎执行流程 物体节点、声音组件\物理组件\UI组件、脚本组件 暴露变量到面板 4.节点的查找 基本查找 this.node&#xff1a;挂载当前脚本的节点A&#xff1b; this.nod…

一、精准化测试介绍

精准化测试介绍 一、精准化测试是什么&#xff1f;二、什么是代码插桩&#xff1f;三、两种插桩方式Offine模式&#xff1a;On-the-fly插桩: 四、jacoco覆盖率报告展示五、增量代码覆盖率监控原理六、精准测试系统架构图七、全量与增量覆盖率报告包维度对比八、全量与增量覆盖率…

牛客NC343 和大于等于K的最短子数组【困难 前缀和 Java/Go】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/3e1fd3d19fb0479d94652d49c7e1ead1 思路 本答案利用前缀和解答&#xff0c;Java&#xff0c;Go答案通过&#xff0c;但是同样的代码用PHP的话有一个测试用例超时 应该还有更优秀的答案&#xff0c;后面找到更优…

2022——蓝桥杯十三届2022国赛大学B组真题

问题分析 看到这个问题的同学很容易想到用十层循环暴力计算&#xff0c;反正是道填空题&#xff0c;一直算总能算得出来的&#xff0c;还有些同学可能觉得十层循环太恐怖了&#xff0c;写成回溯更简洁一点。像下面这样 #include <bits/stdc.h> using namespace std; in…

apk反编译修改教程系列-----反编译apk 去除软件强制更新的八种方式步骤解析【十七】

安卓有的apk 软件会不断更新。但有些用户需要旧版的有些功能或者新版功能增减原因等等。需要不更新继续使用。这类问题有的可以简单修改版本号来跳过更新。或者有的软件可以忽略。但对于某些无法跳过更新界面等等的apk。就需要深度反编译来去除软件的强制更新。 通过课程可以了…

wsl安装Xfce桌面并设置系统语言和输入法

一、安装xfce &#xff08;有相关的依赖都会安装&#xff09; sudo apt -y install xfce4 二、 安装远程连接组件 sudo apt install xrdp -y 并重新启动 Xrdp 服务&#xff1a; sudo systemctl restart xrdp 本地windows系统中请按 winR 键 呼出运行 在运行中输入 mstsc…

1067: 有向图的邻接表存储强连通判断

解法&#xff1a; 定理&#xff1a;有向图G是强连通图的充分必要条件是G中存在一条经过所有节点的回路 跟上道题一样 这是错误代码 #include<iostream> #include<vector> using namespace std; int arr[100][100]; void dfs(vector<bool>& a,int u) {a…

唤醒手腕 Go 语言 并发编程、Channel通道、Context 详细教程(更新中)

并发编程概述 ​ 一个进程可以包含多个线程&#xff0c;这些线程运行的一定是同一个程序&#xff08;进程程序&#xff09;&#xff0c;且都由当前进程中已经存在的线程通过系统调用的方式创建出来。进程是资源分配的基本单位&#xff0c;线程是调度运行的基本单位&#xff0c…

【JAVA进阶篇教学】第十二篇:Java中ReentrantReadWriteLock锁讲解

博主打算从0-1讲解下java进阶篇教学&#xff0c;今天教学第十二篇&#xff1a;Java中ReentrantReadWriteLock锁讲解。 在并发编程中&#xff0c;读写锁&#xff08;ReadWriteLock&#xff09;是一种用于管理对共享资源的访问的锁机制&#xff0c;它提供了比传统的互斥锁更高的…

栈的讲解

栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。 进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底&#xff08;因为先进后出&#xff09;。栈中的数据元素遵守后进先出LIFO&#xff08;Last In Firs…

数据结构与算法===回溯法

文章目录 原理使用场景括号生成代码 小结 原理 回溯法是采用试错的思想&#xff0c;它尝试分步骤的去解决一个问题。在分步骤解决问题的过程中&#xff0c;当它通过尝试发现现有的分步答案不能得到有效的正确的解答的时候&#xff0c;它将取消上一步甚至是上几步的计算&#x…

函数模板底层本质

#include<iostream> using namespace std;template<typename T1> T1 ave(T1 a, T1 b) { return a b; }int main() {ave(100, 200);ave(short(100), short(200));return 0; }反汇编代码 模板本质是编译器帮忙生成了不同的函数 就算非类型参数值不一样编译器也重新…

HCIP(BGP综合实验)--8

一&#xff1a;实验要求 二&#xff1a;实现过程 &#xff08;一&#xff09;配置IP地址&#xff1a; AR1: [AR1]int g0/0/0 [AR1-GigabitEthernet0/0/0]ip add 12.1.1.1 24 [AR1-GigabitEthernet0/0/0]int l0 [AR1-LoopBack0]ip add 172.16.0.1 32 [AR1-LoopBack0]int l1 […

TM1650 并联在I2C 信号线的处理方法

目的是可以并联多个TM1650 在标准I2C 总线上&#xff0c;并且不影响其他标准I2C 器件。思路就是拿个额外的开关控制每一片TM1650 的使能&#xff0c;就像SPI 的CS 信号那样。 协议 TM1650 的通信协议虽说不是标准I2C&#xff0c;但也算是比较兼容的&#xff0c;比方说&#x…

springboot实现Aop

一、原理 AOP&#xff08;Aspect Oriented Programming&#xff09;的意思是&#xff1a;面向切面编程&#xff0c;通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技术。利用AOP可对业务逻辑进行增强&#xff0c;在不改变原有逻辑的基础上&#xff0c;在其前后进…