KAFKA入门教程

目录

1.安装kafka

2.安装kafkamanager可视化工具

3.springboot整合kafka

1.pom导包

2.启动类和yml配置

3.代码演示 

编写生产者:

消费者:


1.安装kafka

进入kafka官网下载对应版本kafka

kafka官网地址:Apache Kafka

kafka是使用Scala开发,所以版本号是由 Scala的版本号和Kafka版本号组成的,如:kafka_2.12-3.2.0 , 2.12是scala版本, 3.2.0是kafka版本,下载完成解压得到kafka,目录结构如下:

结构介绍:
 

bin :kafka的执行脚本 ,其中包括启动kafka的脚本:kafka-server-start.bat 和 zookeeper-server-start.bat 启动zookeeper的脚本(kafka内置有zookeeper) ,bin/windows 目录中的脚本是针对windows平台。
config : 配置文件目录 ,包括server.properties :kafka的配置 ; zookeeper.properties :zookeeper的配置, producer.properties:生产者的配置 ; consumer.properties 消费者的配置等等。
libs : 依赖的三方jar包

可以进入config文件夹,修改kafka和zookeeper配置文件:

zookeeper.properties是作为zookeeper的配置文件,dataDir为数据目录,clientPort为启动端口,比如你想修改zookeeper的默认端口通过配置文件修 clientPort=2181项即可 ,如下:

server.properties作为kafka的配置文件,我们关注下面几个配置,你也可以根据情况进行修改

broker.id =0 : 如果是做个多个kafka主机集群,那么brocker.id不能重复,0 ;1 ;2 增长
zookeeper.connect : zookeeper的地址 ,如果有多个zk就用逗号隔开配置多个地址
num.partions = 1 : 默认partions 数量默认为1
log.dirs : 日志目录,不建议放到tmp临时目录,一定要修改,如:log.dirs=d:/kafka-logs

在安装目录下运行cmd,使用命令:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

启动zookeeper

使用命令:

.\bin\windows\kafka-server-start.bat .\config\server.properties

启动kafka

2.安装kafkamanager可视化工具

进入git官网下载:kafka-manager 项目地址:https://github.com/yahoo/kafka-manager

可以直接下载release版本

下载好后需要解压然后对原始文件进行编译,编译完成后会的得到一个kafka-manager-1.3.3.23.zip文件,解压这个文件之后才能启动manager

这里建议大家直接下载编译好的mangaer,地址:

链接:https://pan.baidu.com/s/1oEC2XlPtlSZmOotPYGjpOQ?pwd=jne1 
提取码:jne1

解压好后得到如下结构:

使用bin\kafka-manager命令启动kafka-manager:

启动完成之后访问:http://localhost:9000/ 可以看到kafkaManager主页:

第一次进入需要新建 Cluster

输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.8.1.1)

注意:如果没有在 Kafka 中配置过 JMX_PORT,千万不要选择第一个复选框。
Enable JMX Polling
如果选择了该复选框,Kafka-manager 可能会无法启动。

 

以下全使用默认设置:

点击进入刚刚创建的集群即可看到如下结构:

点击topics可以看到所有创建的topic主题;brokers则代表所有集群内的kafka服务,有几个服务就会显示几个broker;点击topics可以进入查看topic

进入test_topic:

相关参数和使用教程文档可以参考这个大佬的文章:Kafka可视化管理工具kafka-manager部署安装和使用_kafka manager-CSDN博客

3.springboot整合kafka

1.pom导包

创建一个maven结构的springboot项目,首先在pom中导入如下依赖:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><!-- swagger --><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency></dependencies>

2.启动类和yml配置

导入依赖之后需要为SpringBoot创建启动类,在启动类中我们通过注解的方式创建一个Topic,如下:

@SpringBootApplication
public class KafKaApplication {private static final String TOPIC_NAME = "kafka_test_topic";public static void main(String[] args) {SpringApplication.run(KafKaApplication.class);}//通过定义Bean的方式创建Topic@Beanpublic NewTopic topicHello(){//创建Topic : topic名字, partition数量 , replicas副本数量return TopicBuilder.name(TOPIC_NAME).build();}
}

在yml中对kafka做一些常规配置,如下:

server:port: 12012
spring:application:name: application-kafkakafka:bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer #键序列化value-serializer: org.apache.kafka.common.serialization.StringSerializer #值序列化retries: 1 # 消息发送重试次数# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 #批量大小properties:linger:ms: 0 #提交延迟consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #键序列化value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #值序列化group-id: test-consumer-group #消费者的ID,这个对应 config/consumer.properties中的group.id

更详细的全局配置以及说明:

spring :
kafka :
bootstrap-servers : 192.168.10.70 : 9092
producer :
# 发生错误后,消息重发的次数。
retries : 0
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批
次可以使用的内存大小,按照字节数计算。
batch-size : 16384
# 设置生产者内存缓冲区的大小。
buffer-memory : 33554432
# 键的序列化方式
key-serializer : org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer : org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功
响应。
acks : 1
consumer :
# 自动提交的时间间隔 在 spring boot 2.X 版本中这里采用的是值的类型为 Duration 需要符合
特定的格式,如 1S,1M,2H,5D
auto-commit-interval : 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest (默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之
后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset : earliest
# 是否自动提交偏移量,默认值是 true, 为了避免出现重复数据和数据丢失,可以把它设置为 false,
然后手动提交偏移量
enable-auto-commit : false
# 键的反序列化方式
key-deserializer : org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer :
org.apache.kafka.common.serialization.StringDeserializer
# 配置使用默认的消费组 ID
group-id : defaultConsumerGroup
listener :
# 在侦听器容器中运行的线程数。
concurrency : 5
#listner 负责 ack ,每调用一次,就立即 commit
ack-mode : manual_immediate
missing-topics-fatal : false

3.代码演示 

编写生产者:

编写生产者案例 ,Kafka提供了 KafkaTemplate 用来向Kafka发送消息,直接在查询中注入即可使用。KafkaTemplate提供了很多个重载的send方法,方法返回ListenableFuture对象,即发送的结果对象。

同步阻塞

需要特别注意的是: future.get()方法会阻塞,他会一直尝试获取发送结果,如果Kafka迟迟没有返回发送结果那么程序会阻塞到这里。所以这种发送方式是同步的。

当然如果你的消息不重要允许丢失你也可以直接执行 : kafkaTemplate.send ,不调用get()方法获取发送结果,程序就不会阻塞,当然你也就不知道消息到底有没有发送成功。

异步非阻塞

幸好Kafka为 ListenableFuture 提供了Callback异步回调,我们可以通过异步回调来接收发送结果

@RestController("/producer")
@Api(tags = "生产者示例接口", description = "生产者示例接口 | 消息发送测试接口", hidden = false)
public class ProducerContrller {private static final String TOPIC_NAME = "kafka_test_topic";@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate;/*** 同步,阻塞消息队列* @param msg* @return* @throws ExecutionException* @throws InterruptedException*/@PostMapping("/sendSyncMsg/{msg}")@ApiOperation(value = "生产者生成数据", notes = "同步,阻塞消息队列")@ApiImplicitParams({@ApiImplicitParam(name = "msg", value = "需要发送的数据", required = true, dataType = "String"),})public String sendSyncMsg(@PathVariable("msg")String msg) throws ExecutionException, InterruptedException {ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(TOPIC_NAME, msg);System.out.println("发送结果:"+future.get().toString());return "发送成功";}/*** 异步,非阻塞消息队列* @param msg* @return* @throws ExecutionException* @throws InterruptedException*/@PostMapping("/sendAsyncMsg/{msg}")@ApiOperation(value = "生产者生成数据", notes = "异步,非阻塞消息队列")@ApiImplicitParams({@ApiImplicitParam(name = "msg", value = "需要发送的数据", required = true, dataType = "String"),})public String sendAsyncMsg(@PathVariable("msg")String msg) throws ExecutionException, InterruptedException {ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(TOPIC_NAME, msg);future.addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {@Overridepublic void onFailure(Throwable ex) {ex.getStackTrace();}@Overridepublic void onSuccess(SendResult<Object, Object> result) {System.out.println("发送结果:"+result);}});return "发送成功";}}

也有原生的使用KafkaProducer的方式创建生产者发送消息,这样的好处是可以灵活配置,不需要每次对kafka配置修改后就要重启服务

以下是代码示例:

/*** 原生构建KafkaProducer的生产者方法接口** @param msg* @return*/@PostMapping("/sendMsgByProducer/{msg}")@ApiOperation(value = "生产者生成数据", notes = "原生构建KafkaProducer的生产者方法接口")@ApiImplicitParams({@ApiImplicitParam(name = "msg", value = "需要发送的数据", required = true, dataType = "String"),})public String sendMsgByProducer(@PathVariable("msg") String msg){// 创建一个 Map或Properties 对象,用于构建 Kafka 生产者的配置信息
//        Properties map = new Properties();Map map = new HashMap();// 这个是kafka的地址,对应你server.properties中配置的map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 键序列化map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 值序列化map.put(ProducerConfig.RETRIES_CONFIG, 1); // 设置重试次数map.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 设置重试间隔if (false) {// 添加ssl认证String userName = "";String passWord = "";map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");map.put(SaslConfigs.SASL_MECHANISM, "PLAIN");map.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + userName + "\" password=\"" + passWord + "\";");}// 创建 KafkaProducer 对象 kafkaProducer,并传入配置信息 mapKafkaProducer kafkaProducer = new KafkaProducer<>(map);// 创建要发送的消息 ProducerRecord,同时订阅主题和需要发送的内容ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, msg);// 1.不需要回调的消息发送方式kafkaProducer.send(record);// 创建一个 CompletableFuture 对象 future 用于异步处理发送消息的结果CompletableFuture<Object> future = new CompletableFuture<>();try {// 2.需要回调的消息发送方式kafkaProducer.send(record, (data, exception) -> {if (exception == null) {System.out.println(String.format("Message sent successfully! Topic: {} Partition: {} Offset: {}", data.topic(), data.partition(), data.offset()));future.complete("消息投递成功,无异常"); // 成功时完成future} else {System.out.println(String.format("Error sending message: " + exception.getMessage(), exception));future.completeExceptionally(exception); // 错误时传递异常}});}catch (Exception e){e.printStackTrace();} finally {// 关闭生产者通道,释放资源kafkaProducer.flush();kafkaProducer.close();}return "发送成功";}

消费者:

使用@KafkaListener注释来接收消息,用法比较简单,实例如下:

@Component
public class HelloConsumer {@KafkaListener(topics = "kafka_test_topic")public void handle(ConsumerRecord consumerRecord) {System.out.println("消费者消费消息:" + consumerRecord);System.out.println(String.format("消费者收到消息,topic:%s,partition:%s", consumerRecord.topic(), consumerRecord.partition()));System.out.println("消费内容:" + consumerRecord.value());}//消费消息的时候,给方法添加 Acknowledgment 参数用来签收消息@KafkaListener(topics = "kafka_test_topic", containerFactory = "kafkaManualAckListenerContainerFactory")public void handler(String message, Acknowledgment ack){System.out.println("收到消息:"+message);//确认收到消息ack.acknowledge();}
}

也有原生的使用KafkaProducer的方式创建生产者发送消息的示例:

使用while循环来保证达到与注解的方式相同的实时接收消息的相同的功能,这样的好处是可以灵活配置,可以每次订阅多个不同的topic,不使用的topic可以直接释放掉

@RestController
@Api(tags = "消费者示例接口", description = "消费者示例接口 | 消费消息测试接口", hidden = false)
public class ConsumerContrller {@PostMapping("/useMsg")@ApiOperation(value = "消费者消费数据", notes = "消费者消费消息数据")public String useMsg(){Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("kafka_test_topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} catch (WakeupException e) {// Ignore exception for shutdown} finally {consumer.close();}return "消费成功";}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/739101.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【阿里云系列】-如何实现两个VPC网络资源互通

背景 由于实际项目预算有限&#xff0c;两套环境虽然分别属于不同的专有网络即不同的VPC&#xff0c;但是希望借助一台运维机器实现对两个环境的监控和日常的运维操作 网络架构 如下是需要实现的外网架构图&#xff0c;其中希望实现UAT环境的一台windows的堡垒机可以访问生产…

Spring Cloud集成nacos配置中心

1.添加Nacos Config依赖 打开nacos-config-demo的pom.xml文件并添加以下两个依赖项 项目的配置文件中通常包括数据库连接配置项、日志输出配置项、Redis连接配置项、服务注册配置项等内容&#xff0c;如spring-cloud-alibaba-nacos-config-base-demo项目中就包含数据库连接配置…

数据守护神:自动备份数据的重要性与方案

在数字化时代&#xff0c;数据已经成为我们生活与工作的核心。无论是个人用户还是企业组织&#xff0c;数据的丢失或损坏都可能带来不可估量的损失。因此&#xff0c;自动备份数据的重要性不言而喻。自动备份数据不仅能够确保数据的实时更新和安全性&#xff0c;还能在数据丢失…

坐标变换(二维、三维)

文章目录 一、四种空间1.1 定义和对比1.2 齐次坐标系 二、刚性变换2.1 定义2.2 平移2.3 旋转2.3.1 二维2.3.1.1 绕原点2.3.1.2 绕任意点 2.3.2 三维2.3.2.1 绕x轴2.3.2.2 绕y轴2.3.2.3 绕z轴 一、四种空间 1.1 定义和对比 标量空间&#xff1a;只有标量&#xff1b;向量空间&…

使用gnvm下载nodejs和npm

目录 前言 一、下载gnvm 二、利用gnvm下载nodejs 三、下载对应版本的npm 四、gnvm常用的命令 总结 前言 由于之前下载的版本过低&#xff0c;需要升级版本。但在使用gnvm升级node版本时遇到了一系列的问题&#xff0c;索性就把nodejs全部删除&#xff0c;重新用gnvm在下…

网络请求与数据解析

urllib是Python自带的标准库中用于网络请求的库 &#xff0c;无需安装&#xff0c;直接引用即可。通常用于爬虫开发、API&#xff08;应用程序编程接口&#xff09;数据获取和测试。 urllib库的几个模块&#xff1a; urllib.request :用于打开和读取URLurllib.error:包含提出…

羊大师揭秘羊奶滋养,养生的新黄金选择

羊大师揭秘羊奶滋养&#xff0c;养生的新黄金选择 羊奶&#xff0c;这个自古以来的天然营养佳品&#xff0c;近年来逐渐受到现代人的青睐&#xff0c;成为养生的新黄金选择。它以其独特的营养价值和滋养功效&#xff0c;为追求健康生活的我们提供了全新的养生视角。 羊奶的滋…

LeetCode # 1161. 最大层内元素和

1161. 最大层内元素和 题目 给你一个二叉树的根节点 root。设根节点位于二叉树的第 1 层&#xff0c;而根节点的子节点位于第 2 层&#xff0c;依此类推。 请返回层内元素之和 最大 的那几层&#xff08;可能只有一层&#xff09;的层号&#xff0c;并返回其中 最小 的那个。…

Vue:自动按需导入element-plus图标

自动导入使用 unplugin-icons 和 unplugin-auto-import 从 iconify 中自动导入任何图标集。 完整vite.config.js参考模板 https://download.csdn.net/download/ruancexiaoming/88928539 动态导入图标参考 https://blog.csdn.net/ruancexiaoming/article/details/136568219 导入…

几个redis常用命令

转载说明&#xff1a;如果您喜欢这篇文章并打算转载它&#xff0c;请私信作者取得授权。感谢您喜爱本文&#xff0c;请文明转载&#xff0c;谢谢。 ping&#xff1a;测试连接是否存活 例如&#xff1a;测试当前redis数据库是否存活 127.0.0.1:6379> ping #返回PONG&am…

python学习28

前言&#xff1a;相信看到这篇文章的小伙伴都或多或少有一些编程基础&#xff0c;懂得一些linux的基本命令了吧&#xff0c;本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python&#xff1a;一种编程语言&…

不会还有人判断字符是否为数字或字母还用Ascii吧

不会还有人判断字符是否为数字或字母还用Ascii吧 c > a && c < z) || (c > 0 && c < 9当然&#xff0c;也可也用&#xff0c;下面给大家分享几个方法快速判断。 Character.isLetter(ch) 判断ch是否为字母 Character.isDigit(ch) 判断ch是否为数字…

导入空管基础数据

1、首先将data.tar.gz解压到自定义目录中 注意&#xff1a;由于数据文件的压缩包比较大&#xff0c;解压过程可能会持续3~5分钟&#xff0c;请耐心等待。 [rootnode3 ~]# cd /opt/software/ [rootnode3 software]# tar -xzf data.tar.gz -C /opt/ 2、利用SQLyog或者其他数据库…

9、设计模式之组合模式(Composite)

一、什么是组合模式 组合模式也成为整体部分模式&#xff0c;是一种结构型设计模式。它将对象组合成树形的层次结构&#xff0c;用来表示“整体-部分”的关系。通过组合模式&#xff0c;我们可以使用相同的方式处理单个对象和多个对象组合。 二、角色组成 组件&#xff08;Com…

ROS机器人程序设计课程进度安排-2023-2024-2

进度安排由人工智能审核制定。 课程 教学进度表预期效果与课程内容详细描述 一、预期效果 此教学进度表旨在确保《ROS机器人程序设计》课程在2023&#xff5e;2024学年度第二学期内&#xff0c;按照预定的教学计划和进度&#xff0c;有序、高效地进行。通过本课程的教学&…

第二十一天-NumPy

目录 什么是NumPy NumPy使用 1.数组的创建 2.类型转换 3.赠删改查 4.数组运算 5.矩阵运算 什么是NumPy 1.NumPy操作的是多维数组&#xff0c;什么是纬度&#xff1f; NumPy使用 1. 安装 pip install numpy import numpy as np 2.官网&#xff1a; 中文官网&#xff1a…

蝙蝠避障:我生活中的一道光

盲人的世界&#xff0c;是无尽的黑暗。看不见光&#xff0c;看不见色彩&#xff0c;甚至看不见自己的手。但在这个黑暗的世界里&#xff0c;我找到了一个光明的出口&#xff1a;一款可以障碍物实时检测的名为蝙蝠避障的盲人软件。 这款软件就像是我的一双眼睛。它通过先进的激光…

Springboot 整合 Elasticsearch(五):使用RestHighLevelClient操作ES ②

&#x1f4c1; 前情提要&#xff1a; Springboot 整合 Elasticsearch&#xff08;三&#xff09;&#xff1a;使用RestHighLevelClient操作ES ① 目录 一、Springboot 整合 Elasticsearch 1、RestHighLevelClient API介绍 1.1、全查询 & 分页 & 排序 1.2、单条件查询…

【linux线程(一)】什么是线程?怎样操作线程?

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:Linux从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学更多操作系统知识   &#x1f51d;&#x1f51d; Linux线程 1. 前言2. 什么是线…

使用cmd命令运行java

1.普通项目(不带lib文件夹) 1.在桌面上建一个名为com的文件夹&#xff0c;在文件夹中用记事本写两个类文件&#xff0c;后缀改为.java。两个类文件的内容如下图所示&#xff1a; 2.使用javac命令编译主函数&#xff0c;命令行为javac TestMain.java。结果可以看到自动生成了两…