Kafka学习-Java使用Kafka

文章目录

  • 前言
  • 一、Kafka
    • 1、什么是消息队列
      • offset
    • 2、高性能
      • topic
      • partition
    • 3、高扩展
      • broker
    • 4、高可用
      • replicas、leader、follower
    • 5、持久化和过期策略
    • 6、消费者组
    • 7、Zookeeper
    • 8、架构图
  • 二、安装Zookeeper
  • 三、安装Kafka
  • 四、Java中使用Kafka
    • 1、引入依赖
    • 2、生产者
    • 3、消费者
    • 4、运行效果


前言

Kafka消息中间件

一、Kafka

1、什么是消息队列

假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。

在这里插入图片描述

那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即Kafka。(当然着并不能使消费者的处理速度上升)

在这里插入图片描述

offset

那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即offset,记录消息的位置。

在这里插入图片描述

在这里插入图片描述

但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。

在这里插入图片描述

2、高性能

B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。

在这里插入图片描述

topic

但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个topic,生产者按消息的类型投递到不同的topic中,消费者也按照不同的topic进行消费。

在这里插入图片描述

partition

但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个partition分区,每个消费者负责一个partition

在这里插入图片描述

3、高扩展

broker

随着partition过多,所有的partition都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将partition分散部署在不同的机器上。每台机器就代表一个broker
我们可以增加broker来缓解服务器的cpu过高的性能问题。

在这里插入图片描述

4、高可用

replicas、leader、follower

假如某个broker挂了, 那么其中partition中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给partition多加几个副本,统称replicas,并将它们分为leaderfollower
leader负责生产者和消费者的读写,follower只负责同步leader的数据。假如leader挂了,也不会影响follower,随后在follower中选出一个leader,保证消息队列的高可用。

在这里插入图片描述

5、持久化和过期策略

在上面讲述了leader挂掉的情况,如果所有的broker都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。

6、消费者组

如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的offset接着消费,如果我想从某个offset开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。

在这里插入图片描述

7、Zookeeper

上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了Zookeeper组件,它会定期与broker通信,获取Kafka集群的状态,判断哪些broker挂了,消费者组消费到哪了等等。

8、架构图

在这里插入图片描述

二、安装Zookeeper

1、官网地址

https://zookeeper.apache.org/

2、下载

在这里插入图片描述

选择稳定版本下载

在这里插入图片描述

3、解压,修改配置文件

解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg

在这里插入图片描述

修改数据文件目录位置

在这里插入图片描述

4、启动

我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd

在这里插入图片描述

三、安装Kafka

1、官网地址

https://kafka.apache.org/

2、下载

在这里插入图片描述

3、解压,修改配置文件

修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)

在这里插入图片描述

4、启动

bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述

四、Java中使用Kafka

1、引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2、生产者

public static void main(String[] args) throws InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.ACKS_CONFIG, "all");prop.put(ProducerConfig.RETRIES_CONFIG, 0);prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);String topic = "hello";KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka" + i));System.out.println("生产消息:" + i);Thread.sleep(1000);}producer.close();
}

3、消费者

public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1");    // 消费者组prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);    //自动提交偏移量prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);     //自动提交时间KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);ArrayList<String> topics = new ArrayList<>();//可以订阅多个消息topics.add("hello");consumer.subscribe(topics);try {while(true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10));for (TopicPartition topicPartition : poll.partitions()) {//	通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息List<ConsumerRecord<String, String>> partitionRecords = poll.records(topicPartition);//	获取TopicPartition对应的主题名称String topic = topicPartition.topic();//	获取TopicPartition对应的分区位置int partition = topicPartition.partition();//	获取当前TopicPartition下的消息条数int size = partitionRecords.size();System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",topic,partition,size);for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);//	实际的数据内容String key = consumerRecord.key();//	实际的数据内容String value = consumerRecord.value();//	当前获取的消息偏移量long offset = consumerRecord.offset();//	表示下一次从什么位置(offset)拉取消息long commitOffser = offset + 1;System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",key, value, offset, commitOffser);Thread.sleep(1500);}}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}
}

4、运行效果

生产消息

在这里插入图片描述

消费消息

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/837867.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript异步编程——09-Promise类的方法【万字长文,感谢支持】

Promise 类的方法简介 Promise 的 API 分为两种&#xff1a; Promise 实例的方法&#xff08;也称为&#xff1a;Promis的实例方法&#xff09; Promise 类的方法&#xff08;也称为&#xff1a;Promise的静态方法&#xff09; 前面几篇文章&#xff0c;讲的都是 Promise 实…

USB3.0接口——(3)协议层(包格式)

1.协议层 1.1.超高速传输事务 超高速事务&#xff08;SuperSpeed transactions&#xff09;由主机对设备端点请求或发送数据开始&#xff0c;并在端点发送数据或确认收到数据时完成。 超高速总线上的数据传输&#xff08;transfer&#xff09;是主机请求设备应用程序生成的数…

【LAMMPS学习】九、LAMMPS脚本 示例

9. 示例脚本 LAMMPS 发行版包含一个包含许多示例问题的示例子目录。许多是二维模型&#xff0c;运行速度快且易于可视化&#xff0c;在台式机上运行最多需要几分钟。每个问题都有一个输入脚本 (in.*)&#xff0c;并在运行时生成一个日志文件 (log.*)。有些使用初始坐标的数据文…

Linux-基础IO

&#x1f30e;Linux基础IO 文章目录&#xff1a; Linux基础IO C语言中IO交互       常用C接口         fopen         fputs         fwrite         fgets 当前路径       三个文件流 系统文件IO       open函数     …

特征模态分解(FMD):一种小众而又新颖的分解方法

​ 声明&#xff1a;文章是从本人公众号中复制而来&#xff0c;因此&#xff0c;想最新最快了解各类智能优化算法及其改进的朋友&#xff0c;可关注我的公众号&#xff1a;强盛机器学习&#xff0c;不定期会有很多免费代码分享~ 今天为大家介绍一个小众而又新颖的信号分…

tensorflow实现二分类

# 导入所需库和模块 from tensorflow.keras.layers import Dense, Input, Activation # 导入神经网络层和激活函数模块 from tensorflow.keras.models import Sequential # 导入Keras的Sequential模型 import pandas as pd # 导入Pandas库用于数据处理 import numpy as np …

接口文档不显示新写的接口

新写的接口&#xff0c;但是不显示&#xff1a; 仔细对比源码才发现没有写tag&#xff1a; 然后就有了&#xff1a;

ES6之正则扩展

正则表达式扩展 u修饰符&#xff08;Unicode模式&#xff09;y修饰符&#xff08;Sticky或粘连模式&#xff09;s修饰符&#xff08;dotAll模式&#xff09;Unicode属性转义正则实例的flags属性字符串方法与正则表达式的整合 javascript的常用的正则表达式 验证数字邮箱验证手机…

C语言中的循环队列与栈、队列之间的转换实现

引言 在数据结构的学习中&#xff0c;栈&#xff08;Stack&#xff09;和队列&#xff08;Queue&#xff09;是两个非常重要的概念。它们分别遵循着后进先出&#xff08;LIFO&#xff09;和先进先出&#xff08;FIFO&#xff09;的原则。在某些情况下&#xff0c;我们可能需要…

C++——超简单登录项目

程序入口文件 #include <QtWidgets/QApplication> // 包含登录页面头文件 #include "DlgLogin.h"int main(int argc, char *argv[]) {QApplication a(argc, argv);// 程序入口// 调页面起来//DlgMain w;//w.show();// 换成登录页面DlgLogin w;w.show();return…

开源禅道zentao的使用

很不幸禅道因为漏洞被人进攻了&#xff0c;被迫研究。 1.安装 直接使用docker进行部署&#xff0c;这里有非常多门道。官网的镜像easysoft-zentao是属于docker安装&#xff0c;而idoop的镜像虽然也是docker安装&#xff0c;但是实际是使用官网linux一键安装的版本&#xff0c…

一周学习总结:数组与链表

学习内容&#xff1a;数组与链表、计算机网络知识 数组&#xff1a; 从数组的基础知识到相关应用 数组的基础知识&#xff1a;数组在内存中的存储、数组的相关操作&#xff08;获取与更新&#xff09;、数组的相关应用&#xff1a; 二分查找法⭐⭐⭐⭐⭐ ● 掌握左闭右闭的…

2024第16届四川教育后勤装备展6月1日举办 欢迎参观

2024第16届四川教育后勤装备展6月1日举办 欢迎参观 邀请函 主办单位&#xff1a; 中国西部教体融合博览会组委会 承办单位&#xff1a;重庆港华展览有限公司 博览会主题&#xff1a;责任教育 科教兴邦 组委会&#xff1a;交易会159交易会2351交易会9466 展会背景 成都…

Chatgpt教你使用Python开发iPhone风格计算器

上次使用Chatgpt写爬虫&#xff0c;虽然写出来的代码很多需要修改后才能运行&#xff0c;但Chatgpt提供的思路和框架都是没问题。 这次让Chatgpt写一写GUI程序&#xff0c;也就是你常看到的桌面图形程序。 由于第一次测试&#xff0c;就来个简单点的&#xff0c;用Python写用…

GPU Burn测试指导

工具下载链接&#xff1a; https://codeload.github.com/wilicc/gpu-burn/zip/master测试方法&#xff1a; 上传工具到操作系统下&#xff0c;解压缩工具&#xff0c;使用make命令完成编译&#xff08;确保cuda环境变量已经配置成功、 nvcc -v能显示结果&#xff09;。 如果安…

文献速递:多模态深度学习在医疗中的应用--多模式婴儿脑分割技术:模糊引导深度学习

Title 题目 Multimodal Infant Brain Segmentation by Fuzzy-informed Deep Learning 多模式婴儿脑分割技术&#xff1a;模糊引导深度学习 01 文献速递介绍 日益普及的非侵入式婴儿脑磁共振图像&#xff08;MRI&#xff09;为准确理解脑主要发展轨迹的动态性提供了机会&…

树莓派|串口通信协议

1、串口通信原理 串口通讯(Serial Communication)&#xff0c;是指外设和计算机间&#xff0c;通过数据信号线、地线等&#xff0c;按位进行传输数据的一种通讯方式。串口是一种接口标准&#xff0c;它规定了接口的电气标准&#xff0c;没有规定接口插件电缆以及使用的协议。串…

“ModuleNotFoundError: No module named ‘selenium‘”报错如何解决

接上节&#xff1a;测试平台开发之测试框架改造并发执行及结果隔离(1) 上节博客的末尾提到&#xff1a;在命令窗口执行python main.py 可是执行的时候遇到了如下报错&#xff1a; ERRORS _____________________________________________________________ ERROR collecting te…

如何安全高效地进行4S店文件分发,保护核心资产?

4S店与总部之间的文件分发是确保双方沟通顺畅、信息共享和决策支持的重要环节。4S店文件分发涉及到以下文件类型&#xff1a; 销售报告&#xff1a;4S店需要定期向总部提交销售报告&#xff0c;包括销售数量、销售额、市场份额等关键指标。 库存管理文件&#xff1a;包括车辆库…

使用docker创建hadoop集群:Couldn‘t upload the file

运行的环境; Windows10 Docker Desktopdocker-hadoop 出现的问题如下: 解决方法 https://github.com/big-data-europe/docker-hadoop/issues/98