Kafka 下载安装及使用总结

1. 下载安装

官网下载地址:Apache Kafka

下载对应的文件

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

上传到服务器上,解压

tar -xzf kafka_2.13-3.7.0.tgz

目录结果如下

├── bin
│   └── windows
├── config
│   └── kraft
├── libs
├── licenses
└── site-docs

官方文档:Apache Kafka

kafka 有两种启动方式,ZooKeeper 和 KRaft,这里采用 KRaft 的方式,使用 kraft 目录下的配置文件

config
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-mirror-maker.properties
├── connect-standalone.properties
├── consumer.properties
├── kraft
│   ├── broker.properties
│   ├── controller.properties
│   └── server.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
├── trogdor.conf
└── zookeeper.properties

修改 server.properties 文件

  • process.roles:KRaft 模式角色
    • broker
    • controller
    • broker,controller
  • node.id:节点 ID,需要为每个节点分配一个唯一的 ID
  • controller.quorum.voters:Controller 的投票者配置
  • log.dirs:日志目录

初始化集群,先生成一个 UUID

./bin/kafka-storage.sh random-uuid

再执行命令,使用生成的 UUID 完成集群初始化

./bin/kafka-storage.sh format -t thCDFveGRleJro7zTaOOGA -c ./config/kraft/server.properties

然后启动 Kafka

./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

2. Spring Boot 集成

2.1 引入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.4</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.0</version></dependency>
</dependencies>

2.2 配置文件

spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: fable-group

2.3 生产者

使用 KafkaTemplate 实现消息的生产

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {private final KafkaTemplate<String, String> kafkaTemplate;@Autowiredpublic ProducerController(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@GetMapping("/send/message")public String sendMessage() {kafkaTemplate.send("test", "Hello, Kafka!");return "Message sent successfully";}
}

2.4 消费者

添加 @KafkaListener 注解,监听消息

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class ConsumerService {@KafkaListener(topics = "test", groupId = "fable-group")public void listen(ConsumerRecord<String, String> consumerRecord) {System.out.println("Received message: " + consumerRecord.value());}
}

2.5 启动类

添加 @EnableKafka 注解

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
@EnableKafka
public class FableApplication {public static void main(String[] args) {SpringApplication.run(FableApplication.class, args);}
}

2.6 测试

访问 /send/message 接口,可以看到控制台打印出接收到的消息

Received message: Hello, Kafka!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/54467.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动态数据源多种实现方式及对比详细介绍

文章目录 动态数据源实现方式1. 概述2. 动态数据源实现方式2.1 基于 AbstractRoutingDataSource 实现动态数据源2.2 基于 Spring AOP 实现动态数据源2.3 基于 TransactionManager 实现动态数据源2.4 通过数据库中间件实现动态数据源&#xff08;ShardingSphere、MyCAT&#xff…

探索 Go 语言 container 包:强大容器的魔法世界

《探索 Go 语言 container 包:强大容器的魔法世界》 在 Go 语言的世界里,container包就像是一个神奇的宝库,里面藏着各种强大的容器,为开发者提供了高效的数据存储和操作方式。让我们一起揭开这个宝库的神秘面纱,探索container包中的那些容器。 一、container包简介 co…

将成功请求的数据 放入apipost接口测试工具,发送给后端后,部分符号丢失

将成功请求的数据 放入apipost接口测试工具&#xff0c;发送给后端后&#xff0c;部分符号丢失 apipost、接口测试、符号、丢失、错乱、变成空格背景 做CA对接&#xff0c;保存CA系统的校验数据&#xff0c;需要模仿前端请求调起接口&#xff0c;以便测试功能完整性。 问题描…

MySQL:事务隔离级别

SQL 标准定义了四个隔离级别&#xff1a; READ-UNCOMMITTED(读取未提交) &#xff1a;最低的隔离级别&#xff0c;允许读取尚未提交的数据变更&#xff0c;可能会导致脏读、幻读或不可重复读。READ-COMMITTED(读取已提交) &#xff1a;允许读取并发事务已经提交的数据&#xf…

Flink Task 日志文件隔离

Flink Task 日志文件隔离 任务在启动时会先通过 MdcUtils 启动一个 slf4j 的 MDC 环境&#xff0c;然后将 jobId 添加到 slf4j 的 MDC 容器中&#xff0c;随后任务输出的日志都将附带 joid。 MDC 介绍如下&#xff1a; MDC ( Mapped Diagnostic Contexts )&#xff0c;它是一个…

深度学习:(六)激活函数的选择与介绍

激活函数 之前使用的 a σ ( z ) a\sigma(z) aσ(z) &#xff0c;其中 σ ( ) \sigma(~) σ( ) 便是激活函数。 在神经网络中&#xff0c;不同层的激活函数可以不同。 在学习中&#xff0c;一般以 g ( z ) g(z) g(z) 来表示激活函数。 为什么需要(线性)激活函数&#xff…

K8s容器运行时,移除Dockershim后存在哪些疑惑?

K8s容器运行时&#xff0c;移除Dockershim后存在哪些疑惑&#xff1f; 大家好&#xff0c;我是秋意零。 K8s版本截止目前&#xff08;24/09&#xff09;已经发布到了1.31.x版本。早在K8s版本从1.24.x起&#xff08;22/05&#xff09;&#xff0c;默认的容器运行时就不再是Doc…

暑假考研集训营游记

文章目录 摘要&#xff1a;1.对各大辅导机构考研封闭集训营的一些个人看法&#xff1a;2.对于考研原因一些感想&#xff1a;结语 摘要&#xff1a; Ashy在暑假的时候参加了所在辅导班的为期一个月的考研封闭集训营&#xff0c;有了一些全新的感悟&#xff0c;略作记录。 1.对…

【SpringBoot】97、SpringBoot中使用EasyExcel导出/导入数据

1、EasyExcel Java 解析、生成 Excel 比较有名的框架有 Apache poi、jxl。但他们都存在一个严重的问题就是非常的耗内存,poi 有一套 SAX 模式的 API 可以一定程度的解决一些内存溢出的问题,但 POI 还是有一些缺陷,比如 07 版 Excel 解压缩以及解压后存储都是在内存中完成的,…

linux-系统备份与恢复-系统恢复

Linux 系统备份与恢复&#xff1a;系统恢复 1. 概述 Linux 系统的恢复是系统管理的重要组成部分&#xff0c;它指的是在系统崩溃、硬件故障、误操作或安全问题后&#xff0c;恢复系统到可用状态的过程。良好的系统恢复计划可以有效避免数据丢失和业务中断&#xff0c;并确保系…

ESP32配网接入Wifi

1 ESP32的两种模式 AP模式:ESP32可以作为热点,手机和电脑等设备接入使用。 STA模式:ESP32可以作为作为客户端接入其他网络中。 2 流程 step1: ESP32上电后进入STA模式,尝试看能够接入网络 step2: 如何连接成功,则正常运行。如何连接超时,则自动进入AP模式,设置AP热点…

算法之搜索--最长公共子序列LCS

最长公共子序列&#xff08;longest common sequence&#xff09;:可以不连续 最长公共子串&#xff08;longest common substring&#xff09;&#xff1a;连续 demo for (int i 1;i<lena;i){for (int j 1;j<lenb;j){if(a[i-1]b[j-1]){dp[i][j]dp[i-1][j-1]1;}el…

Qt (17)【Qt 文件操作 读写保存】

阅读导航 引言一、Qt文件概述二、输入输出设备类三、文件读写类四、文件和目录信息类五、自定义“记事本” 引言 在上一篇文章中&#xff0c;我们学习了Qt的事件处理机制&#xff0c;知道了如何响应用户的操作。但应用程序常常还需要处理文件&#xff0c;比如读写数据。所以&a…

python爬虫初体验(一)

文章目录 1. 什么是爬虫&#xff1f;2. 为什么选择 Python&#xff1f;3. 爬虫小案例3.1 安装python3.2 安装依赖3.3 requests请求设置3.4 完整代码 4. 总结 1. 什么是爬虫&#xff1f; 爬虫&#xff08;Web Scraping&#xff09;是一种从网站自动提取数据的技术。简单来说&am…

指针修仙之实现qsort

文章目录 回调函数什么是回调函数回调函数的作用 库函数qsort使用qsort函数排序整形使用qsort函数排序结构体 qsort函数模拟实现说明源码and说明 回调函数 什么是回调函数 回调函数就是⼀个通过函数指针调⽤的函数。 如果你把函数的指针&#xff08;地址&#xff09;作为参数…

Sigmoid引发的梯度消失爆炸及ReLU引起的神经元参数失效问题思考

Sigmoid和ReLU激活函数思考&#xff09; 引文Sigmoid函数梯度消失问题梯度爆炸问题解决方案 ReLU函数简化模型示例场景设定前向传播对反向传播的影响总结 内容精简版 引文 梯度消失和梯度爆炸是神经网络训练中常见的两个问题&#xff0c;特别是在使用Sigmoid激活函数时。这些问…

后端-navicat查找语句(单表与多表)

表格字段设置如图 语句&#xff1a; 1.输出 1.输出name和age列 SELECT name,age from student 1.2.全部输出 select * from student 2.where子语句 1.运算符&#xff1a; 等于 >大于 >大于等于 <小于 <小于等于 ! <>不等于 select * from stude…

torch模型量化方法总结

0.概述 模型训练完成后的参数为float或double类型,而装机(比如车载)后推理预测时,通常都会预先定点(量化)为int类型参数,相应的推理的精度会有少量下降,但不构成明显性能下降,带来的结果是板端部署的可能性,推理的latency明显降低,本文对torch常用的量化方法进行总…

JavaEE: 创造无限连接——网络编程中的套接字

文章目录 Socket套接字TCP和UDP的区别有连接/无连接可靠传输/不可靠传输面向字节流/面向数据报全双工/半双工 UDP/TCP api的使用UDPDatagramSocketDatagramPacketInetSocketAddress练习 TCPServerSocketSocket练习 Socket套接字 Socket是计算机网络中的一种通信机制&#xff0…

Python在数据科学与机器学习中的应用

Python 是数据科学与机器学习领域的首选语言之一&#xff0c;广泛应用于数据处理、分析、建模以及预测任务中。Python 拥有丰富的库和工具&#xff0c;能够帮助开发者高效处理数据&#xff0c;并构建各种机器学习模型。下面我们将详细介绍 Python 在数据科学与机器学习中的应用…