Kafka快速入门及使用

入门

官网

简介

  • Kafka是一个分布式的流媒体平台
  • 应用:
    • 消息系统
    • 日志收集
    • 用户行为追踪
    • 流式处理

特点

  • 高吞吐量
  • 消息持久化
  • 高可靠性
  • 高扩展性

常用术语

  • Broker:集群中的服务器
  • Zookeeper:服务管理
  • Topic:主题,Kafka发送消息为发布订阅模式,用来存放消息的空间为Topic
  • Partition:分区,对Topic进行分区,可以多线程读写Topic
  • Offset:消息在分区内存放的索引序列
  • Leader Replica:主副本,对数据做备份;提高容错率,响应读取
  • Follower Replica:随从副本,从主副本做备份;主副本出问题时,从 从副本中随机选取一个作为主副本

安装

下载地址:

Kafka不分平台,虽然压缩包格式为tgz,但是解压后,一样可以在Windows运行。

配置

压缩包解压到不含中文的目录下后,在config包中进行配置;

image

配置Zookeeper.properties

对Zookeeper相关的集群做配置;配置Zookeeper数组存放位置;如下所示:

image

配置server.properties

主要配置Kafka日志文件存放位置;在配置文件中的第62行如下图所示:

image

运行kafka

进入config同层级目录的bin目录中,因为是Windows系统,所以进入windows包中,执行命令。

先启动Zookeeper

在初级目录(解压后可以看到bin以及config的目录)下;执行如下命令启动Zookeeper:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

即使用config目录下的zookeeper配置文件,启动zookeeper;启动结果如下:
image

再启动Kafka

进入kafka_2.13-3.2.3后;执行如下命令启动kafka;

bin\windows\kafka-server-start.bat config\server.properties

启动后结果如下:

image

启动成功

启动成功后可以在之前,在配置文件中,配置的Zookeeper数据目录和Kafka日志保存目录中查看自动创建的文件;如下图所示:

image

使用Kafka

进入包含命令的Windows命令包目录(kafka_2.13-3.2.3\bin\windows)下

创建主题

因为Kafka作为一个消息队列;采用的是发布订阅模式;需要将消息发布到某个主题下,首先需要创建主题;

主题:

  • 代表一个位置
  • 代表一种消息的类别

使用如下命令创建主题:

kafka-topics.bat --create --创建主题的服务器 地址:端口 --创建副本 副本数 --分区 分区数 --topic 主题名

执行成功案例如下图所示:

image

运行如下命令查看主题:

kafka-topics.bat --list --指定主题所在服务器 地址:端口

执行示例如下:

image

发送消息

主题创建成功后,需要再往主题上发送消息;发送消息是以生产者模式身份发送;执行如下命令:

kafka-console-producer.bat --服务器列表 服务器地址:端口 --topic 主题名

执行上述命令后,即可输入需要发布的消息;执行示例如下:

image

接收消息

在新的命令行窗口,重新进入目录(kafka_2.13-3.2.3\bin\windows)下;以消费者身份接收消息;执行命令如下:

kafka-console-consumer.bat --指定服务器 读取消息的服务器地址:端口 --topic 读取消息的主题名 --从头开始读取消息

执行示例如下所示:

image

Spring整合Kafka

引入依赖

在项目pom文件中引入以下依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

配置Kafka

application.properties文件中,进行如下配置:


# 配置服务器列表
spring.kafka.bootstrap-servers=localhost:9092
# 消费者分组id   在Kafka的消费者配置文件中有
# 可在配置文件中 更改分组id 更改后需要重新启动kafka
spring.kafka.consumer.group-id=test-consumer-group
# 是否自动提交 消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true
# 自动提交频率 此处配置3000ms
spring.kafka.consumer.auto-commit-interval=3000

访问Kafka

主要是通过生产者发送消息;消费者监听消息,测试在Spring中使用Kafka代码如下:

/*** @author 花木凋零成兰* @date 2024/3/23 23:41*/
@SpringBootTest
@ContextConfiguration(classes = Application.class)		// 使用Application类的配置
public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka() {// 发送消息kafkaProducer.send("test", "你好");kafkaProducer.send("test", "在干嘛");try {Thread.sleep(1000 * 20);    // 阻塞主线程 用户观察消费者是否接收到消息} catch (InterruptedException e) {throw new RuntimeException(e);}}}/*** 生产者*/
@Component
class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 发送消息方法* @param topic 主题* @param content 发送消息的内容*/public void send(String topic, String content) {kafkaTemplate.send(topic, content);}}/*** 消费者*/
@Component
class KafkaConsumer {/*** 监听主题发送的消息* @param record    消息自动封装为ConsumerRecord*/@KafkaListener(topics = {"test"})   // 需要监听的主题public void handleMessage(ConsumerRecord record) {System.out.println(record.value()); // 读取消息}}

运行测试后,成功结果如下所示,可观察到消费者读取到生产者发送的消息

image

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/766678.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux/openEuler系统部署spring boot+vue前后端分离项目(nginx均衡代理)

Linux/openEuler系统部署spring bootvue前后端分离项目&#xff08;nginx均衡代理&#xff09; 1、系统环境准备&#xff0c;安装openjdk和nginx还有MySQL&#xff0c;咱们本文先连接主机mysql进行登录&#xff08;linux上的mysql服务可以先不安装&#xff09; 可以看我前面的…

springboot精品源码

springboot精品源码 所有项目都包括&#xff1a;源码数据库文件开题LW说明文档运行视频 请看主页资料联系。 项目类型包括: 1 SpringBoot学生心理咨询评估系统 2 基于SpringBoot的网上订餐系统 3 大学生租房平台的设计与实现 4 SpringBoot房屋租赁系统 5 基于SpringBoot的课…

SpringCloud之网关组件Gateway学习

SpringCloud之网关组件Gateway学习 GateWay简介 Spring Cloud Gateway是Spring Cloud的⼀个全新项目&#xff0c;目标是取代Netflix Zuul&#xff0c;它基于Spring5.0SpringBoot2.0WebFlux&#xff08;基于高性能的Reactor模式响应式通信框架Netty&#xff0c;异步⾮阻塞模型…

STM32---DHT11温湿度传感器与BH1750FVI光照传感器(HAL库、含源码)

写在前面&#xff1a;本节我们学习使用两个常见的传感器模块&#xff0c;分别为DHT11温湿度传感器以及BH1750FVI光照传感器,这两种传感器在对于环境监测中具有十分重要的作用&#xff0c;因为其使用简单方便&#xff0c;所以经常被用于STM32的项目之中。今天将使用分享给大家&a…

QGIS编译(跨平台编译)057:FastCGI编译(Windows、Linux、MacOS环境下编译)

文章目录 1、FastCGI介绍2、FastCGI下载3、Windows下编译4、linux下编译5、MacOS下编译1、FastCGI介绍 FCGI 是 FastCGI 的缩写,是一种用于改善 CGI(Common Gateway Interface)性能的协议。在传统的 CGI 中,每次请求都需要启动一个新的进程来处理,这导致了较高的资源消耗和…

【测试思考】设计测试用例时,你在想什么

突然想写这篇文章是因为&#xff0c;前两天看到一篇文章【像用户一样测试】 然后想起事儿 .. 想到在2020年上海爆发疫情后&#xff0c;开始频繁使用买菜软件&#xff0c;在一个深夜从某团紧急挑选加购商品&#xff0c;看到提醒自己账户还有一张满减优惠券&#xff0c;挺高兴的…

claude3国内怎么用

你是否苦恼没有渠道接触最牛的AI——Claude3&#xff0c;这个已经被媒体刷屏的彻底吊打了ChatGPT-4的地表最强AI。 最近&#xff0c;一个国内的claude3镜像站出现了&#xff0c;国内的小伙伴也可以体验了。 无论你用它写文案、做PPT、写代码、调bug、还是画图&#xff0c;都不…

【Linux】调试器-gdb的安装与使用

1. 背景 程序的发布方式有两种&#xff0c;debug模式和release模式 Linux gcc/g出来的二进制程序&#xff0c;默认是release模式 要使用gdb调试&#xff0c;必须在源代码生成二进制程序的时候, 加上 -g 选项 GDB的安装 在开始之前&#xff0c;确保已经安装了GDB。如果没有安…

Android 观察者模式

在Android中&#xff0c;观察者模式&#xff08;Observer Pattern&#xff09;是一种常用的设计模式&#xff0c;用于在对象之间建立一对多的依赖关系&#xff0c;当一个对象的状态发生改变时&#xff0c;所有依赖于它的对象都会得到通知并自动更新。在Android开发中&#xff0…

HTML5和CSS3笔记

一&#xff1a;网页结构(html)&#xff1a; 1.1&#xff1a;页面结构&#xff1a; 1.2&#xff1a;标签类型&#xff1a; 1.2.1&#xff1a;块标签&#xff1a; 1.2.2&#xff1a;行内标签&#xff1a; 1.2.3&#xff1a;行内块标签&#xff1a; 1.2.4&#xff1a;块标签与行…

独孤思维:流量暴涨,却惨遭违规

最近独孤操作虚拟资料短视频&#xff0c;有个很深的感悟。 每天发10条短视频&#xff0c;积累到20天左右&#xff0c;播放量和粉丝数开始暴涨。 虽然很多牛比的比我数据好&#xff0c;但是对于刚做短视频的独孤来说&#xff0c;我已经满足了。 但是又发了10来天&#xff0c;…

如何用VSCode和Clangd与Clang-Format插件高效阅读Linux内核源码及写驱动

一、如何高效阅读Linux源码&#xff1a;基于clangd uboot/busybox等都可以用这种方式&#xff0c;理论上说所有基于Make和Cmake的源码工程都可以用这套方案 阅读Linux源码最大问题在于调用链太复杂&#xff0c;一个函数或变量引用太多&#xff0c;source insight和cscope等基于…

Pink老师Echarts教学笔记

可视化面板介绍 ​ 应对现在数据可视化的趋势&#xff0c;越来越多企业需要在很多场景(营销数据&#xff0c;生产数据&#xff0c;用户数据)下使用&#xff0c;可视化图表来展示体现数据&#xff0c;让数据更加直观&#xff0c;数据特点更加突出。 01-使用技术 完成该项目需…

阿里云服务器(Ubuntu22)上的MySQL8数据库下载,安装和远程连接

最近阿里云centos主机到期了改为使用Ubuntu操作系统&#xff0c;在上面安装mysql并远程连接出现了一系列问题并解决。 之前在centos系统上下载mysql8的教程如下&#xff1a; 阿里云服务器&#xff08;centos7&#xff09;上的MySQL8数据库下载&#xff0c;安装和远程连接 主机操…

rollup打包起手式

使用Rollup打包JavaScript rollup是一款小巧的javascript模块打包工具&#xff0c;更适合于库应用的构建工具;可以将小块代码编译成大块复杂的代码&#xff0c;基于ES6 modules,它可以让你的 bundle 最小化&#xff0c;有效减少文件请求大小,vue在开发的时候用的是webpack,但是…

【小沐学Python】Python实现Web图表功能(Lux)

文章目录 1、简介2、安装3、测试3.1 入门示例3.2 入门示例2 结语 1、简介 https://github.com/lux-org/lux 用于智能可视化发现的 Python API Lux 是一个 Python 库&#xff0c;通过自动化可视化和数据分析过程来促进快速简便的数据探索。通过简单地在 Jupyter 笔记本中打印出…

招聘自媒体编辑岗位的人才测评方案

人才测评工具在招聘入职的方案&#xff0c;在线工具网根据自媒体岗位的特性和需求来分析&#xff0c;并制定自媒体主编的测评方案。 自媒体作为互联网时代的产物&#xff0c;自然也为我们带来了很多的福利&#xff0c;例如&#xff1a;海量的信息、快捷的传媒方式&#xff0c;那…

百度网盘联盟申请盘主的方法

在百度网盘联盟目前有3种赚钱方式&#xff0c;第一种是自己售卡赚佣金&#xff1b;第二种是外链分享&#xff08;分销联盟&#xff09;&#xff1b;第三种是邀请好友加盟&#xff0c;好友售卡&#xff0c;自己得提成收入&#xff0c;需要申请盘主后即可开始 申请盘主&#xff1…

3个Tips,用“AI”开启新生活

相信最近&#xff0c;很多朋友们都回归到了忙碌的生活节奏中。生活模式的切换&#xff0c;或多或少会带来身体或情绪状况的起伏。新技术正在为人们生活的方方面面带来便利。3个小Tips或许能让你也从新技术中获益&#xff0c;从身到心&#xff0c;用“AI”开启新生活。 关”A…

RAPTOR:树结构的索引和检索系统的递归抽象处理

论文地址&#xff1a;https://arxiv.org/pdf/2401.18059.pdf 摘 要 增强型检索语言模型能够更好地适应世界状态的变化&#xff0c;并整合长尾知识&#xff0c;然而现有大多数方法仅能从检索语料库中检索到较短的连续文本片段&#xff0c;这限制了对整个文档上下文的整体理解。…