docker安装和使用kafka

1. 启动zookeeper

Kafka依赖zookeeper, 首先安装zookeeper
-p:设置映射端口(默认2181

docker run --name zookeeper \--network app-tier \-e ALLOW_ANONYMOUS_LOGIN=yes \--restart=always \-d bitnami/zookeeper:latest

2. 启动kafka

docker run --name kafka \--network app-tier \-p 9092:9092 \-e ALLOW_PLAINTEXT_LISTENER=yes \-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092	 \--restart=always \-d bitnami/kafka:latest
命令解释
ALLOW_PLAINTEXT_LISTENER=yes任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper地址
KAFKA_CFG_ADVERTISED_LISTENERS当前kafka安装的主机地址 如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误

2. 启动kafka-map管理工具

docker run --name kafka-map \--network app-tier \-p 9001:8080 \-v /usr/local/kafka-map/data:/usr/local/kafka-map/data \-e DEFAULT_USERNAME=admin \-e DEFAULT_PASSWORD=admin \--restart=always \-d dushixiang/kafka-map:latest

启动成功后, 访问客户端: http://localhost:9001
账户: admin
密码: admin

在这里插入图片描述

3. springboot集成kafka

pom.xml配置

    <dependencies><!--kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>        

配置application.yml

#------------------------------------spring----------------------------------
spring:#------------------------------------消息队列kafka配置----------------------------------kafka:#  kafka server的地址,如果有多个,使用逗号分割bootstrap-servers: localhost:9092producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。32MB的批处理缓冲区buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1properties:# 自定义拦截器interceptor.classes: com.wms.message.kafka.interceptor.CustomProducerInterceptor#自定义分区器partitioner.classes: com.wms.message.kafka.interceptor.CustomPartitionerconsumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:# 自定义消费者拦截器interceptor.classes: com.wms.message.kafka.interceptor.CustomConsumerInterceptor# 默认消费者组group-id: code-safe-group# 设置最大轮询间隔时间(毫秒),默认值为 300000(5分钟)# 如果两次 poll() 之间的时间超过此配置值,可能导致 rebalance, 消费者会被剔除 此处设置10分钟max-poll-interval-ms: 600000# 批量一次最大拉取数据量max-poll-records: 1000batch:# 批消费并发量,小于或等于Topic的分区数concurrency: 3listener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: falsetopics:# 自定义主题名称twsm: webSocket_send_message_devgroup-id: group-idtopic-name:- topic1

测试发送消息到kafka

/*** Kafka测试** @version 1.0* @author: web* @date: 2024/1/18 15:07*/
@Slf4j
@RestController
@RequestMapping("/message/kafkaTest")
public class KafkaTestController extends BaseController
{@Autowiredprivate KafkaUtils kafkaUtils;/*** 生产者_推送消息到kafka** @param msg* @author: web* @return: AjaxResult* @date: 2024/1/18 15:16*/@PostMapping("/send")public AjaxResult send(@RequestBody Map<String, Object> msg){try{String userId = msg.get("userId").toString();Object content = msg.get("content");Message message = kafkaUtils.setMessage(userId, content);kafkaUtils.send(KafkaUtils.TOPIC_TEST, message);}catch (Exception e){log.error("生产者_推送消息到kafka发生异常");}return success();}/*** 消费者1** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/18 15:07*/@KafkaListener(topics = KafkaUtils.TOPIC_TEST)public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional message = Optional.ofNullable(record.value());if (message.isPresent()){Object msg = message.get();log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}/*** 消费者2** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/18 15:07*///    @KafkaListener(topics = KafkaUtils.TOPIC_TEST, groupId = KafkaUtils.TOPIC_GROUP2)//    public void topicTest2(ConsumerRecord<?, ?> record, Acknowledgment ack,//                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)//    {////        Optional message = Optional.ofNullable(record.value());//        if (message.isPresent())//        {//            Object msg = message.get();//            log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);//            ack.acknowledge();//        }//    }}

KafkaUtils类

/*** 生产者** @version: 1.0* @author: web* @date: 2024/1/18 10:37*/
@Component
@Slf4j
public class KafkaUtils
{@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 自定义topic*/public static final String TOPIC_TEST = "topic.code-safe";/*** 自定义消费组*/public static final String TOPIC_GROUP1 = "topic.group1";public static final String TOPIC_GROUP2 = "topic.group2";// 业务相关topic/*** 主题: webSocket发送消息到客户端*/public static String TOPIC_WEBSOCKET_SEND_MESSAGE;@Autowiredprivate String[] kafkaTopicName;/*** 获取配置文件中的盐值,并设置到静态变量中** @param topic 主题*/@Value("${spring.kafka.topics.twsm}")private void setTwsmTopic(String topic){TOPIC_WEBSOCKET_SEND_MESSAGE = topic;}/*** 发送消息** @param topic   主题* @param message 消息内容* @author: web* @return: void* @date: 2024/1/18 10:42*/public void send(String topic, Object message){if (StringUtils.isEmpty(topic) || StringUtils.isNull(message)){throw new ServiceException("生产者发送消息到kafka_主题或消息内容不能为空!");}String obj2String = JsonUtils.toJsonString(message);//        log.info("准备发送消息为:{}", obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj2String);// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>(){@Overridepublic void onFailure(Throwable throwable){//发送失败的处理log.error(topic + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult){//成功的处理
//                log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}/*** 设置websocket发送的消息体** @param userId 用户ID* @param msg    消息内容* @author: web* @return: Message 消息对象* @date: 2024/1/19 11:36*/public Message setMessage(String userId, Object msg){Message message = new Message();message.setSendUserId(userId);message.setSendTime(DateUtils.getTime());message.setSendContent(String.valueOf(msg));return message;}
}

Message类

@Data
public class Message implements Serializable
{private static final long serialVersionUID = -118L;/*** 发送人ID*/private String sendUserId;/*** 发送人*///    private String sendUserName;/*** 发送时间*/private String sendTime;/*** 发送内容*/private String sendContent;
}

监听消息

/*** 消息接收监听器【分布式系统】** @version: 1.0* @author: web* @date: 2024/1/19 13:44*/
@Component
@Slf4j
public class MessageListener
{/*** 根据用户id发送消息到客户端** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/20 22:05*/@KafkaListener(topics = "#{'${spring.kafka.topics.twsm}'}", groupId = "#{topicGroupId}")public void sendMessageByUserId(ConsumerRecord<String, String> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<String> optional = Optional.ofNullable(record.value());if (optional.isPresent()){Message message = JsonUtils.parseObject(optional.get(), Message.class);if (StringUtils.isNull(message)){log.error("消费者收到kafka消息的内容为空!");return;}
//            log.info("消费者收到kafka消息");String sendUserId = message.getSendUserId();String sendContent = message.getSendContent();// 确认收到消息ack.acknowledge();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/726161.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32各外设初始化步骤

1、GPIO初始化步骤 1、使能GPIO时钟 2、初始化GPIO的输入/输出模式 3、设置GPIO的输出值或获取GPIO的输入值 GPIO_InitTypeDef GPIO_InitStruct;RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA,ENABLE);GPIO_InitStruct.GPIO_Mode GPIO_Mode_Out_PP; GPIO_InitStruct.GPIO_Pin…

青少年如何从零开始学习Python编程?有它就够了!

文章目录 写在前面青少年为什么要学习编程 推荐图书图书特色内容简介 推荐理由粉丝福利写在最后 写在前面 本期博主给大家带来一本非常适合青少年学习编程的图书&#xff0c;快来看看吧~ 青少年为什么要学习编程 青少年学习编程&#xff0c;就好比在他们年轻时就开始掌握一种…

线程简介

线程简介 这里先说明一下&#xff0c;进程和线程是不同的 进程&#xff1a;程序的执行过程&#xff0c;是一个独立的运行环境&#xff0c;持有资源和线程&#xff0c;相当于一个应用程序&#xff0c;操作系统在分配资源时把资源分配给进程(堆和方法区是属于进程的) 线程&#x…

【YOLO v5 v7 v8 v9小目标改进】辅助超推理SAHI:分而治之,解决高分辨率图像中小物体检测的问题

辅助超推理SAHI&#xff1a;分而治之&#xff0c;解决高分辨率图像中小物体检测的问题 设计思路结构小目标涨点YOLO v5 魔改YOLO v7 魔改YOLO v8 魔改YOLO v9 魔改 论文&#xff1a;https://arxiv.org/pdf/2202.06934.pdf 代码&#xff1a;https://github.com/obss/sahi 设计思…

C++内存泄漏检测

C进阶专栏&#xff1a;http://t.csdnimg.cn/aTncz 相关系列文章 C技术要点总结, 面试必备, 收藏起来慢慢看 C惯用法之RAII思想: 资源管理 C智能指针的自定义销毁器(销毁策略) 目录 1.内存泄漏概述 1.1.内存泄漏产生原因 1.2 内存泄漏导致的后果 1.3 内存泄漏解决思路 2.宏…

基于Springboot免费搭载轻量级阿里云OSS数据存储库(将本地文本、照片、视频、音频等上传云服务保存)

一、注册阿里云账户 打开https://www.aliyun.com/&#xff0c;申请阿里云账户并完成实名认证&#xff08;个人&#xff09;。这种情况就是完成了&#xff1a; 二、开通OSS服务 点击立即开通即可。 三、创建Bucket 申请id和secert&#xff1a; 进去创建一个Accesskey就会出现以…

【Qt学习】QProgressBar的使用(进度条的实现)

文章目录 1. 介绍2. 实例2.1 按钮启动进度条2.2 更改进度条样式2.3 资源文件 1. 介绍 详细的 QProgressBar 内容可以通过 查阅Qt官方文档 &#xff0c;这里进行简要的总结&#xff1a; QProgressBar 是Qt框架中的一个控件&#xff0c;用于显示进度条&#xff1a; QProgressBar…

wordpress免费主题下载

免费wordpress模板下载 简洁大气的文化艺术类wordpress模板&#xff0c;可以免费下载&#xff0c;实用易上手&#xff0c;新手也适合。 https://www.wpniu.com/themes/304.html 免费wordpress主题下载 高端大气上档次的wordpress主题&#xff0c;也可以是免费的&#xff0c;…

修改MonkeyDev默认配置适配Xcode15

上一篇文章介绍了升级Xcode15后,适配MonkeyDev的一些操作,具体操作可以查看:Xcode 15 适配 MonkeyDev。 但是每次新建项目都要去修改那些配置,浪费时间和精力,这篇文章主要介绍如何修改MonkeyDev的默认配置,做到一次修改永久生效。 MonkeyDev的默认安装路径是在/opt/Mo…

iclone更奇怪了用自动对齐才搞得定

1前一个clip的位置 2选root的话就跑到这里了&#xff0c;跟前一个clip差很多 3换了left foot对齐之后才正常 4这时候开不开自动对齐不影响 5奇怪医生的中心似乎是途中的花坐标轴偏离人体好多呀不知何时跑这里的难道前面是应为这个&#xff1f;中心跑了我还不知道 6动画交叉的时…

【常见索引使用】⭐️Mysql中索引的类型以及使用方式和失效场景

目录 一、前言 二、数据准备 三、索引的分类 四、索引示例 示例1、主键索引&#xff08;Primary Key Index&#xff09;与 唯一索引&#xff08;Unique Index&#xff09; 示例2、前缀索引&#xff08;Prefix Index&#xff09; 示例3、联合索引&#xff08;复合索引&am…

GWO-RF|灰狼算法优化随机森林 分类预测|多变量分类预测

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、算法介绍&#xff1a; 灰狼优化算法&#xff1a; 随机森林&#xff1a; 四、完整程序下载&#xff1a; 一、程序及算法内容介绍&#xff1a; …

如何规划应用商店优化策略

应用商店是拥挤的地方。拥有超过 600 万个应用程序&#xff0c;制定应用程序商店优化 (ASO) 策略比以往任何时候都更加重要。ASO 有助于确保您的应用在搜索结果中排名更高&#xff0c;以便潜在用户可以轻松找到它。通过针对App Store和 Google Play优化App&#xff0c;能够吸引…

使用ES检索PDF或Word等格式文件方案

#大数据/ES #经验 #方案架构 ES检索PDF/Word等格式文件方案 插件安装 ES有文档预处理插件&#xff0c;但是7.x版本默认发版包不包含这个ingest attachment plugin 。 通过摄取附件插件&#xff0c;Elasticsearch 可以使用 Apache 文本提取库 Tika 提取常见格式的文件附件&a…

Tomcat介绍在IDEA中创建JavaWeb工程

文章目录 一、WEB服务器服务器概述使用Java代码手写web服务器 二、服务器软件Web服务器服务器软件的使用步骤 三、TomcatTomcat的下载Tomcat的安装与卸载Tomcat的启动与关闭常见问题 四、新建Java Web项目并将项目部署到tomcat中新建Java Web项目将项目部署到Tomcat中出现的问题…

[晓理紫]每日论文分享(有中文摘要,源码或项目地址)--大模型

专属领域论文订阅 VX关注{晓理紫}&#xff0c;每日更新论文&#xff0c;如感兴趣&#xff0c;请转发给有需要的同学&#xff0c;谢谢支持 如果你感觉对你有所帮助&#xff0c;请关注我&#xff0c;每日准时为你推送最新论文。 》》 由于精力有限&#xff0c;今后就不在CSDN上更…

去除PDF论文行号的完美解决方案

去除PDF论文行号的完美解决方案 1. 遇到的问题 我想去除论文的行号&#xff0c;但是使用网上的Adobe Acrobat裁剪保存后 如何去掉pdf的行编号&#xff1f; - 知乎 (zhihu.com) 翻译时依然会出现行号&#xff0c;或者是转成word&#xff0c;这样就大大损失了格式&#xff0c;…

第十五届蓝桥杯青少组STEMA测评SPIKE初级真题试卷 2024年1月

第十五届蓝桥杯青少组STEMA测评SPIKE初级真题试卷 2024年1月 ​​​​​​​ 来自&#xff1a;6547网 http://www.6547.cn/doc/vywur8eics

SOC设计:关于时钟门控的细节

有如下几个信号 输入信号 1、同步后的rstnsync_clk 2、时钟&#xff1a;clk 3、test_mode 4、软件控制信号&#xff1a;clk_sub_en 输出信号 1、clk_sub 功能&#xff1a;软件配置的使能信号clk_sub_en经过时钟clk 2拍同步处理后产生clk 域下的enable信号&#xff0c;然…

常用MII接口详解

开放式系统互连 (OSI) 模型 七层开放系统互连 (OSI) 模型中&#xff0c;以太网层 位于最底部两层 - 物理层和数据链路层。 从百兆以太网接口开始 首先是百兆以太网规定的两种接口 介质无关接口 (MII) Media Independent Interface 介质相关接口 (MDI) Medium Depen…