芝法酱躺平攻略(21)——kafka安装和使用

本节内容比较初级,故接着躺平攻略写

一、官网的下载

1.1 下载解压

首先,去官网下载jar包,放进linux中,解压到对应位置。
我的位置放在/WORK/MIDDLEWARE/kafka/4.0

1.2 常见配置

# 每个topic默认的分片数
num.properties=4
# 数据被删除的时间
log.retention.hours=168
# 文件存储路径,注意,这不是日志,而是数据
log.dirs=/WORK/MIDDLEWARE/kafka/4.0/kraft-combined-logs
# 这个地方一定要修改,不然客户端无法连通
# 这里要写成ip
advertised.listeners=PLAINTEXT://192.168.0.64:9092,CONTROLLER://192.168.0.64:9093

1.3 自启动

创建 /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/bin/bash -c 'source /etc/profile && /WORK/MIDDLEWARE/kafka/4.0/bin/kafka-server-start.sh /WORK/MIDDLEWARE/kafka/4.0/config/server.properties'
ExecStop=/bin/bash -c 'source /etc/profile && /WORK/MIDDLEWARE/kafka/4.0/bin/kafka-server-stop.sh'
Restart=on-failure[Install]
WantedBy=multi-user.target                       

启用

systemctl daemreload
systemctl enable kafka

1.4 创建topic

bin/kafka-topics.sh --create --topic my-test-topic --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic my-test-topic --bootstrap-server localhost:9092

描述信息展示如下:

        Topic: my-test-topic    Partition: 0    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:Topic: my-test-topic    Partition: 1    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:Topic: my-test-topic    Partition: 2    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:Topic: my-test-topic    Partition: 3    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:

毕竟我们是学习环境,搭的单机节点,对于每个分区没有做副本。生产环境下,注意把副本分配到不同的节点上
使用参数如下:

--replica-assignment "<partition0>:<brokerA>,<brokerB>,…;<partition1>:<brokerC>,<brokerD>,…;…"
#如:
--replica-assignment "0:1,2;1:2,3;2:1,3

解释一下,':‘前面的是分区的编号;’:'后面是这个分区的数据,分别放到哪个broker下

1.5 安装kafka-ui

cd /WORK/MIDDLEWARE/kafka
mkdir kafka-ui
cd kafka-ui
vim docker-compose.yml

编辑docker-compose文件

services:kafka-ui:container_name: kafka-uiimage: provectuslabs/kafka-ui:latestports:- 9100:8080environment:DYNAMIC_CONFIG_ENABLED: 'true'

二、SpringBoot的生产者接入

2.1 pom引用

注意,我这里的indi.zhifa.engine-cloud:common-web-starter是自己写的库,便于快速创建web项目,大家可以去 我的码云 下载

    <dependencies><dependency><groupId>indi.zhifa.engine-cloud</groupId><artifactId>common-web-starter</artifactId><version>${zhifa-engine.version}</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.10</version></dependency></dependencies>

2.2 生产者java核心代码:

service

@Slf4j
@Component
public class KafkaSendDataImpl implements IKafkaSendData {private final KafkaTemplate<String, String> mKafkaTemplate;private final FastJsonConfig mFastJsonConfig;public KafkaSendDataImpl(KafkaTemplate<String, String> pKafkaTemplate,@Qualifier("simple-fastJson-config") FastJsonConfig pFastJsonConfig) {mKafkaTemplate = pKafkaTemplate;mFastJsonConfig = pFastJsonConfig;}@Overridepublic void sendAsync(String topic,KafkaData pKafkaData) {String str = JSON.toJSONString(pKafkaData);try{mKafkaTemplate.send(topic,pKafkaData.getName(),str);}catch (Exception e){log.error("发送kafka时发成错误,错误信息是"+ e.getMessage());}}
}

controller

@Slf4j
@Validated
@RequiredArgsConstructor
@Tag(name = "生产者")
@ZhiFaRestController
@RequestMapping("/kafka/produce")
public class KafkaProduceController {final IKafkaSendData mKafkaSendData;@PostMapping("/{topic}")public void sendAsync(@PathVariable("topic") String pTopic, @RequestBody KafkaData pKafkaData){mKafkaSendData.sendAsync(pTopic,pKafkaData);}
}

配置:


server:# 服务端口port: 8083springdoc:swagger-ui:path: /swagger-ui.htmltags-sorter: alphaoperations-sorter: alphaapi-docs:path: /v3/api-docsgroup-configs:- group: "管理接口"paths-to-match: '/**'packages-to-scan:- indi.zhifa.study2025.test.kafka.producer.controllerzhifa:enum-memo:enabled: trueenum-packages:- indi.zhifa.**.enumsuri: /api/enumweb:enabled: truespring:profiles:active: localkafka:bootstrap-servers: 192.168.0.64:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: zstd#0 的时候,吞吐量最高,不管是否成功#1 leader收到后才响应#-1 要求所有的follow都写成功#通常iot项目,日志采集等,该值设为0.仅仅用来解耦时,比如订单处理业务,一般设成all,避免丢失,并且在回调监控。并且会自动开启幂等性。acks: all# 重试次数retries: 3

我们创建几条消息,观察现象:
在这里插入图片描述
打开swagger-ui,看到确实有消息数量了
在这里插入图片描述

2.3 key的作用

额外解释一点,发送时,指定消息的key。kafka默认会把同一个key放在一个partition(分区)中。我这里用name做key,可以保证同一个name的消息被顺序消费。

三、SpringBoot的消费者接入

消费者非常简单,这里略写

3.1 java核心代码

@Component
public class KafkaConsumerListener {private Map<String,Long> mMsgIdx;public KafkaConsumerListener() {mMsgIdx = new ConcurrentHashMap<>();}@KafkaListener(topics = "my-test-topic", groupId = "my-group")public void listen(ConsumerRecord<String, String> record) {String key = record.key();           // 获取消息的 keyString value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);if(StringUtils.hasText(key)){Long idx = mMsgIdx.get(key);if(idx == null){idx = 0l;}idx = idx + 1;mMsgIdx.put(key, idx);System.out.println(key+"的第"+idx+"个消息");}}
}

3.2 配置

spring:profiles:active: localkafka:bootstrap-servers: 192.168.0.64:9092consumer:group-id: my-group   # 消费者组IDauto-offset-reset: earliest   # 消费者从头开始读取(如果没有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/77965.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AutoSAR从概念到实践系列之MCAL篇(二)——Mcu模块配置及代码详解(上)

欢迎大家学习我的《AutoSAR从概念到实践系列之MCAL篇》系列课程,我是分享人M哥,目前从事车载控制器的软件开发及测试工作。 学习过程中如有任何疑问,可底下评论! 如果觉得文章内容在工作学习中有帮助到你,麻烦点赞收藏评论+关注走一波!感谢各位的支持! 根据上一篇内容中…

easypoi 实现word模板导出

特此非常致谢&#xff1a;easypoi实现word模板 基础的可以参考上文&#xff1b; 但是我的需求有一点点不一样。 这是我的模板&#xff1a;就是我的t.imgs 是个list 但是很难过的是easy poi 我弄了一天&#xff0c;我都没有弄出来嵌套list循环怎么输出显示&#xff0c;更难过…

Unity中数据存储_LitJson

文章目录 LitJson一&#xff1a;介绍二&#xff1a;特点三&#xff1a;使用四&#xff1a;注意事项 LitJson 一&#xff1a;介绍 LitJson 是一个专为 .NET 设计的轻量级 JSON 处理库&#xff0c;支持序列化和反序列化 JSON 数据。 二&#xff1a;特点 快速且轻量 无外部依赖…

2025年首届人形机器人半程马拉松比赛(附机器人照片)

2025年4月19日&#xff0c;北京亦庄半程马拉松暨人形机器人半场马拉松正式开赛&#xff0c;作为全球首届人形机器人户外跑步成功举办&#xff0c;21.0975公里的户外路程对人形机器人来讲&#xff0c;注定将成为历史性开篇&#xff0c;如果赛事能够持续举办&#xff0c;那举办意…

网络安全职业技能大赛Server2003

通过本地PC中渗透测试平台Kali对服务器场景Windows进⾏系统服务及版本扫描渗透测 试&#xff0c;并将该操作显示结果中Telnet服务对应的端⼝号作为FLAG提交 使用nmap扫描发现目标靶机开放端口232疑似telnet直接进行连接测试成功 Flag&#xff1a;232 通过本地PC中渗透测试平台…

[java八股文][Java基础面试篇]I/O

Java怎么实现网络IO高并发编程&#xff1f; 可以用 Java NIO &#xff0c;是一种同步非阻塞的I/O模型&#xff0c;也是I/O多路复用的基础。 传统的BIO里面socket.read()&#xff0c;如果TCP RecvBuffer里没有数据&#xff0c;函数会一直阻塞&#xff0c;直到收到数据&#xf…

Python常用的第三方模块之【jieba库】支持三种分词模式:精确模式、全模式和搜索引擎模式(提高召回率)

Jieba 是一个流行的中文分词Python库&#xff0c;它提供了三种分词模式&#xff1a;精确模式、全模式和搜索引擎模式。精确模式尝试将句子最精确地切分&#xff0c;适合文本分析&#xff1b;全模式则扫描文本中所有可能的词语&#xff0c;速度快但存在冗余&#xff1b;搜索引擎…

QT6 源(37):界面组件的总基类 QWidget 的源码阅读(下,c++ 代码部分)

&#xff08;1&#xff09; QT 在 c 的基础上增加了自己的编译器&#xff0c;以支持元对象系统和 UI 界面设计&#xff0c;有 MOC 、 UIC 等 QT 自己的编译器。本节的源代码里&#xff0c;为了减少篇幅&#xff0c;易于阅读&#xff0c;去除了上篇中的属性部分&#xff0c; 上篇…

rabbitmq-spring-boot-start版本优化升级

文章目录 1.前言2.优化升级内容3.依赖4.使用4.1发送消息代码示例4.2消费监听代码示例4.3 brock中的消息 5.RabbmitMq的MessageConverter消息转换器5.1默认行为5.2JDK 序列化的缺点5.3使用 JSON 进行序列化 6.总结 1.前言 由于之前手写了一个好用的rabbitmq-spring-boot-start启…

git lfs下载大文件限额

起因是用 model.load_state_dict(torch.load())加载pt权重文件时&#xff0c;出现错误&#xff1a;_pickle.UnpicklingError: invalid load key, ‘v’. GPT告诉我&#xff1a;你的 pt 文件不是权重文件&#xff0c;而是模型整体保存&#xff08;或根本不是 PyTorch 文件&#…

什么是RAG?RAG的主要流程是什么?

**RAG(Retrieval-Augmented Generation)**是一种结合检索与生成技术的框架,旨在通过引入外部知识增强生成模型的性能。其核心思想是:在生成文本时,先从外部知识库中检索相关信息,再将检索结果与原始输入结合,作为生成模型的输入,从而提升生成内容的准确性、相关性和信息…

【Rust 精进之路之第13篇-生命周期·进阶】省略规则与静态生命周期 (`‘static`)

系列: Rust 精进之路:构建可靠、高效软件的底层逻辑 作者: 码觉客 发布日期: 2025年4月20日 引言:让编译器“读懂”你的意图——省略的艺术 在上一篇【生命周期入门】中,我们理解了生命周期的必要性——它是 Rust 编译器用来确保引用有效性、防止悬垂引用的关键机制。我…

Python爬虫实战:获取xie程网敦煌酒店数据并分析,为51出行做参考

一、引言 伴随互联网的飞速发展,在线旅游平台成为人们出行预订酒店的重要途径。xie程网作为国内颇具知名度的在线旅游平台,存有丰富的酒店信息。借助爬取xie程网的酒店数据并加以深入分析,能够为用户提供更为精准的酒店推荐,特别是在旅游旺季,如 51 出行期间。本研究致力…

第二十一讲 XGBoost 回归建模 + SHAP 可解释性分析(利用R语言内置数据集)

下面我将使用 R 语言内置的 mtcars 数据集&#xff0c;模拟一个完整的 XGBoost 回归建模 SHAP 可解释性分析 实战流程。我们将以预测汽车的油耗&#xff08;mpg&#xff09;为目标变量&#xff0c;构建 XGBoost 模型&#xff0c;并用 SHAP 来解释模型输出。 &#x1f697; 示例…

PyMC+AI提示词贝叶斯项目反应IRT理论Rasch分析篮球比赛官方数据:球员能力与位置层级结构研究

全文链接&#xff1a;tecdat.cn/?p41666 在体育数据分析领域不断发展的当下&#xff0c;数据科学家们致力于挖掘数据背后的深层价值&#xff0c;为各行业提供更具洞察力的决策依据。近期&#xff0c;我们团队完成了一项极具意义的咨询项目&#xff0c;旨在通过先进的数据分析手…

【android bluetooth 框架分析 03】【Bta 层详解 1】【Bluetooth Application Laye 介绍】

蓝牙协议栈中 Bluetooth Application Layer&#xff08;蓝牙应用层&#xff09;是协议栈核心组成部分&#xff0c;它位于协议栈中间偏上的位置&#xff0c;主要负责将底层 Bluetooth Stack&#xff08;如 L2CAP、AVDTP、RFCOMM、SDP 等&#xff09;与上层 Profile 和 Android F…

单片机获取真实时间的实现方法

单片机获取真实时间&#xff08;即当前的年月日、时分秒等&#xff09;通常需要依赖外部时间源或模块&#xff0c;因为单片机本身没有内置的实时时钟&#xff08;RTC&#xff09;功能。 在 C 语言环境下&#xff0c;单片机获取真实时间通常需要依赖 外部硬件模块&#xff08;如…

Linux——进程优先级/切换/调度

1.进程优先级 1.进程优先级是什么&#xff1a;进程获取CPU资源的先后顺序 2.为什么要有进程优先级&#xff1a;因为一般CPU只有一块&#xff0c;资源短缺&#xff0c;所以就需要优先级来确定谁先谁后的问题 3.值越低 进程的优先级越高 ps -l进行查看 UID&#xff1a;user id …

铸铁划线平板:多行业的精密测量工具(北重十字滑台加工厂家)

铸铁划线平板是一种用于精密测量和校准的工具&#xff0c;广泛应用于各个行业。它通常由铸铁制成&#xff0c;表面经过精密加工&#xff0c;能够保证较高的平整度和准确度。铸铁划线平板的主要作用是用来检验工件的平整度和垂直度&#xff0c;也常用于划线、校准和测量工件的平…

Excel/WPS表格中图片链接转换成对应的实际图片

Excel 超链图变助手&#xff08;点击下载可免费试用&#xff09; 是一款将链接转换成实际图片&#xff0c;批量下载表格中所有图片的转换工具&#xff0c;无需安装&#xff0c;双击打开即可使用。 表格中链接如下图所示&#xff1a; 操作方法&#xff1a; 1、双击以下图标&a…