Kafka的简介及架构

目录

消息队列

产生背景

消息队列介绍

常见的消息队列产品

应用场景

 消息队列的消息模型

Kafka的基本介绍

简介

Kafka的架构

Kafka的使用

Kafka的shell命令

Kafka的Python API的操作

完成生产者代码

完成消费者代码


消息队列

产生背景

消息队列:指数据在一个容器中,从容器中一端传递到另一端过程

消息:指的数据,只不过这个这个数据存在一定流动状态

队列:指的容器,可以存储数据,这个容器具备FIFO(先进先出)特性

公共容器的特点:

1.公共性:各个程序都可以与之对接

2.FIFO特性:先进先出

3.具备高效的并发能力:能够承载海量数据

4.具备一定的容错能力:比如支持重新读取消息方案

消息队列介绍

常见的消息队列产品

MQ:message queue消息队列

activeMQ: 出现时期比较早的一款消息队列的中间件产品,在早期使用人群是非常多,目前整个社区活跃度严重下降,使用人群基本很少
rabbitMQ: 此款是目前使用人群比较多的一款消息队列的中间件的产品,社区活跃度比较高,主要是应用传统业务领域中
rocketMQ: 是阿里推出的一款消息队列的中间件的产品,目前主要是在阿里系环境中使用,目前支持的客户端比较少,主要是Java中应用较多
Kafka: Apache旗下的顶级开源消息,是一款消息队列的中间件产品,项目来源于领英,是大数据体系中目前为止最为常用的一款消息队列产品

应用场景

消息队列的应用场景:

1.应用解耦合

2.异步处理

3.限流削峰

4.消息驱动系统

 消息队列的消息模型

在Java中, 为了能够集成消息队列的产品, 专门提供了一个消息队列的协议: JMS(Java Message Server)  java消息服务

消息队列中两个角色:生产者(producer)和消费者(consumer)

生产者:生产/发送消息到消息队列中

消费者:从消息队列中获取消息

在JMS规范中,专门规定了两种消息消费类型:

1.点对点消费类型:一条消息最终只能被一个消费所消费,微信聊天的私聊

2.发布订阅消费模型:指一条消息最终被多个消费者所消费,微信聊天的群聊

Kafka的基本介绍

简介

Kafka是一款消息队列中间件产品,来源于领英公司,后期贡献给了Apache,目前是Apache旗下的顶级开源项目,采用语言是Scala

Kafka的特点:

1.可靠性:Kafka集群是分布式的,有多副本机制,数据可以自动复制

2.可扩展性:Kafka集群可以灵活的调整,在线扩容

3.耐用性:Kafka数据保存在磁盘上,数据有多副本机制,数据持久化,一定程度上防止数据丢失

4.高性能:Kafka可以存储海量的数据,虽然是使用磁盘进行存储,但是Kafka有各种优化手段(例如:磁盘的顺序读写,零拷贝等)提高数据的读写速度(吞吐量)

Kafka的架构

1. Kafka中集群节点叫broker,节点与节点之间没有主从之分,地位是完全一样

2.Topic:主题/话题,是业务层面对消息进行分类的

3.一个Topic可以设置多个分区

4.同一个partition分区可以设置多个副本,但是副本数不能超过(>)集群broker节点的个数

5.broker节点间没有主从之分,但是同一个partition分区的不同副本间有主从之分,分为Leader主副本和Follwer从副本

6.生产者将数据首先发送给到Leader主副本,接着是Leader主副本主动往Follower从副本上同步消息

7.Zookeeper用来管理集群,以及管理元数据信息

8.ISR同步列表,该列表中存放的是与Leader主副本消息同步程度最接近的Follower从副本,也就是消息最小的一个列表,该列表的作用是当Leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follower从副本变成Leader主副本,对外提供服务

相关名词

Kafka Cluster : kafka集群

Topic : 主题/话题

Broker : Kafka中的节点

Producer : 生产者,负责生产/发送消息到Kafka中

Consumer : 消费者,负责从Kafka中获取消息

Partition : 分区,一个Topic可以设置多个分区,没有数量限制

Kafka的使用

Kafka的shell命令

      Kafka本质上是一个消息队列中间件产品,主要负责消息数据的传递,也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据

创建Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test02 --partitions 4 --replication-factor 2

参数说明:

        --bootstrap-server:kafka集群中broker连接信息

        --create:指定操作类型,这里是新建Topic

        --topic:指定要新建的Topic名称

        --partitions:设置Topic的分区数

        --replicattion-factor:设置Topic分区的的副本数

注意:如果副本数超过了集群broker节点个数,会报错

查看Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --list

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --list: 指定操作类型。这里是查看Kafka集群上所有可用的Topic列表

查看具体Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic test04
参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --describe: 指定操作类型。这里是查看具体Topic信息

模拟生产者Producer

./kafka-console-producer.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04
参数说明:
    --broker-list: Kafka集群中broker连接信息
    --topic: 指定要将消息发送到哪个具体的Topic

 模拟消费者Consumer

./kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --topic: 指定要从哪个Topic中消费消息
    --from-beginning: 指定该参数以后,会从最旧的地方开始消费
    latest: 消费者(默认)从最新的地方开始消费
    --max-messages: 最多消费的条数。满足条数后,就会自动结束
    --group: 指定消费组名称。一个消费者只能属于一个消费组;一个消费组里面可以有多个消费者。同一个Topic中的同一条数据,只能被同一个消费组中的一个消费者所消费
    
在工作中的参数一般如何使用?
答: 推荐latest、--max-messages、--group一同使用。因为实际企业中Topic的数据量是特别大的,消费、打印都需要消耗服务器的资源,如果不限定消费的最大条数,可能造成服务器宕机。

修改Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 10

分区: 只能增大,不能减小。而且没有数量限制
副本: 既不能增大,也不能减小

查看消费组中有多少个消费者

./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_01 --members --describe

Kafka的Python API的操作

准备工作:在服务器的节点上安装一个python用于操作Kafka的库

安装命令:
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

API使用的参考文档:
https://kafka-python.readthedocs.io/en/master/usage.html#kafkaproducer

完成生产者代码

import timefrom kafka import KafkaProducer# 同步发送
def sync_send():global topic, partition, offset# 2.1- 同步发送数据/消息metadata = producer.send("test01", value=f"hello_java_{i}".encode("UTF-8")).get()# metadata = producer.send("test03",value=f"hello_spark_{i}".encode("UTF-8")).get()# 2.2- 获取元信息中的内容topic = metadata.topicpartition = metadata.partition"""offset消息偏移量,从0开始编号。也就是一条消息在分区中的序号/索引在不同分区间,消息偏移量是无序在同一个分区里面,消息偏移量是有序"""offset = metadata.offsetprint(f"{topic},{partition},{offset},{metadata}")if __name__ == '__main__':# 1- 创建生产者producer = KafkaProducer(bootstrap_servers=["node1.itcast.cn:9092","node2.itcast.cn:9092"])# 2- 发送消息for i in range(10):# 同步发送# sync_send()# 2.3- 异步发送"""异步发送,需要等待一下,或者明确关闭Producer生产者"""producer.send("test01", value=f"hello_hive_{i}".encode("UTF-8"))time.sleep(1)# 3- 释放资源/关闭生产者# producer.close()

完成消费者代码

from kafka import KafkaConsumerif __name__ == '__main__':# 1- 创建消费者consumer = KafkaConsumer("test01",bootstrap_servers=["node1.itcast.cn:9092", "node2.itcast.cn:9092"])# 2- 消费消息for msg in consumer:topic = msg.topicpartition = msg.partitionoffset = msg.offset# key和value消费出来都是bytes数据类型,需要进行解码key = msg.keyvalue = msg.valueprint(f"{topic},{partition},{offset},{key},{value.decode('UTF-8')},{msg}")

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/614478.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux学习记录——삼십팔 网络层IP协议

文章目录 1、了解IP协议2、IP协议报文1、8位服务类型2、16位总长度(字节数)3、8位生存时间(TTL) 3、网段划分1、网段划分和CIDR方案2、子网划分简单方法3、IP地址问题的解决方案 4、公网内网1、内网分配2、运营商管理方法 5、路由…

【Python基础】一文搞懂:Python 中 Excel 文件的写入与读取

文章目录 1 引言2 使用 openpyxl2.1 安装 openpyxl2.2 写入 Excel 文件2.3 读取 Excel 文件 3 使用 pandas3.1 安装 pandas 和 openpyxl3.2 写入 Excel 文件3.3 读取 Excel 文件 4 实例演示4.1 安装所需库4.2 封装为excel_example.py脚本文件 5 注意事项6 总结 1 引言 在现代办…

spring-boot项目启动类错误: 找不到或无法加载主类 com.**Application

问题:Springboot项目启动报错:错误: 找不到或无法加载主类 com.**Application 解决步骤: 1.File–>Project Structure 2.Modules–>选中你的项目–点击“-”移除 3.重新导入:点击“”号,选择Import Module&…

【漏洞复现】优卡特脸爱云一脸通智慧管理平台文件上传漏洞

Nx01 产品简介 脸爱云一脸通智慧管理平台是一套功能强大,运行稳定,操作简单方便,用户界面美观,轻松统计数据的一脸通系统。无需安装,只需在后台配置即可在浏览器登录。 功能包括:系统管理中心、人员信息管…

【react-quill】富文本编辑器空格回显无效

<ReactQuilltheme"snow"id{id}ref{(el:any) > {if (el) {reactQuillRef.current el;}}}modules{modules}formats{formats}value{value}onChange{onChange}onBlur{onBlur}className"ql-editor"/> 关键&#xff1a; className"ql-editor&quo…

GPT 商店强势来袭,人人都要有自己的 GPTs

作者&#xff1a;苍何&#xff0c;前大厂高级 Java 工程师&#xff0c;阿里云专家博主&#xff0c;CSDN 2023 年 实力新星&#xff0c;土木转码&#xff0c;现任部门技术 leader&#xff0c;专注于互联网技术分享&#xff0c;职场经验分享。 &#x1f525;热门文章推荐&#xf…

网络基础学习(3):交换机

1.交换机结构 &#xff08;1&#xff09;网线接口和后面的电路部分加在一起称为一个端口&#xff0c;也就是说交换机的一个端口就相当于计算机上的一块网卡。 如果在计算机上安装多个网卡&#xff0c;并让网卡接收所有网络包&#xff0c;再安装具备交换机功能的软件&#xff0…

ORB SLAM2 编译

文章目录 软件版本编译编译自动编译手动编译 软件版本 Pangolin0.6opencv3.4.0 ORB SLAM2 编译 # 更改Opencv依赖版本与添加Pangolin依赖 # CMakelist.txt更改 LIST(APPEND CMAKE_PREFIX_PATH /usr/local/opencv-3.4) # 添加 LIST(APPEND CMAKE_PREFIX_PATH /usr/local/Pang…

数据加工:从原始数据到有价值的信息

在当今数字化的时代&#xff0c;数据已经成为了企业和组织最宝贵的资产之一。然而&#xff0c;原始数据往往需要经过加工和处理&#xff0c;才能转化为有价值的信息和知识。数据加工是指将原始数据进行处理和分析&#xff0c;以提取有用的信息和知识的过程。数据加工的重要性不…

springboot 集成kafka

1. SpringBoot快速集成Kafak_springboot集成kafaka-CSDN博客 2. kafka 启动&#xff1a;Windows系统下快速启动Kafka_windows启动kafka-CSDN博客 3.

idea 设置文件头

idea 设置创建文件时自动添加文档注释信息 /** * Description * Author jimaomao * DATE ${DATE} ${TIME} */

c# ==操作符和equals方法的区别

在C#中&#xff0c;""操作符和Equals()方法有着不同的用途和行为。 ""操作符&#xff1a; "“操作符用于比较两个对象的值是否相等。当使用”"操作符比较两个引用类型的对象时&#xff0c;它会比较它们的引用是否指向相同的内存地址。对于值类…

【Kafka-3.x-教程】-【五】Kafka-监控-Eagle

【Kafka-3.x-教程】专栏&#xff1a; 【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入门 【Kafka-3.x-教程】-【二】Kafka-生产者-Producer 【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft 【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer 【Kafka-3.x-教程】-【五…

React之自定义路由组件

开篇 react router功能很强大&#xff0c;可以根据路径配置对应容器组件。做到组件的局部刷新&#xff0c;接下来我会基于react实现一个简单的路由组件。 代码 自定义路由组件 import {useEffect, useState} from "react"; import React from react // 路由配置 e…

机器人领域顶刊TRO,TASE及RAL的区别与关系

一、背景 机器人领域越来越火&#xff0c;特别是具身智能的加持&#xff0c;让机器人在不久的未来可以完成更多复杂的任务&#xff0c;进入到我们的生活&#xff08;不过应该还需要很长时间&#xff09;。作为机器人方向的研究僧&#xff0c;我们会看到许多机器人期刊&#xf…

Semaphore信号量详解

在Java并发编程中&#xff0c;Semaphore是一个非常重要的工具类。它位于java.util.concurrent包中&#xff0c;为我们提供了一种限制对临界资源的访问的机制。你可以将其视为一个同步控制的瑞士军刀&#xff0c;因为它既能够控制对资源的并发访问数量&#xff0c;也能够保证资源…

2000-2021年全国各省环境相关指标数据(890+指标)

2000-2021年全国各省环境相关指标数据&#xff08;890指标&#xff09; 1、指标时间&#xff1a;2000-2021年 2、范围&#xff1a;31省市 3、来源&#xff1a;2001-2022年环境统计年鉴 4、指标&#xff1a;工业废水排放总量、工业废水排放达标量、工业废水处理量、化学需氧…

C语言代码 转换ASCII码为对应字符

转换以下ASCII码为对应字符并输出它们&#xff0c; 73,32,99,97,110,32,100,111,32,105,116,33 代码示例&#xff1a; #include <stdio.h>int main() {int arr[] { 73,32,99,97,110,32,100,111,32,105,116,33 };int i 0;int sz sizeof(arr) / sizeof(arr[0]);whil…

【野火i.MX6ULL开发板】开发板连接网络(WiFi)与 SSH 登录、上电自动登录、设置静态IP、板子默认参数

0、前言 参考之前自己写的&#xff1a; http://t.csdnimg.cn/g60P8 参考资料&#xff1a; [野火]《Linux基础与应用开发实战指南——基于i.MX6ULL开发板》_20230323 从野火官网下载 参考博客&#xff1a; http://t.csdnimg.cn/8uh4O 参考官方文档&#xff1a; https://doc.…

java 将json数据转为model

将json数据转为自己的实体model 要转化&#xff0c;首先要明白自己拿到的json是什么格式&#xff0c;想要转换为什么格式 json中 map&#xff08;以{}包着&#xff09;就是一个对象&#xff0c;list&#xff08;以[]包着&#xff09;就是一个数组 看清楚自己的json数据结构是否…