消息中间件Kafka(PHP版本)

        小编最近需要用到消息中间件,有需要要复习一下以前的东西,有需要的自取,强调一点,如果真的想了解透彻,一定要动手脑袋会了不代表就会写了

        Kafka是由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息

Kafka 的特性

高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
高并发: 支持数千个客户端同时读写

Kafka 的使用场景

活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
流式处理:流式处理是有一个能够提供多种应用程序的领域。
限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

消息:
Kafka中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:为了提高效率,消息会分批次写入Kafka,批次就代指的是一组消息。

主题:
消息的种类称为主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:
主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,
由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

生产者:
向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

消费者:
订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

消费者群组:
生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,
消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

偏移量:
偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

broker:
一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

broker 集群:
broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

安装

下载地址

librdkafka获取地址:https://github.com/edenhill/librdkafka
kafka获取地址:https://github.com/arnaud-lb/php-rdkafka

安转java环境

下载地址:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
下载解压完成之后,设置系统变量:path(路径为:安装目录/bin)
设置环境变量:JAVA_HOME(路径为:安装目录\bin)

查看是否安装成功:java -version

Zookeeper安装

下载地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz
设置环境变量:path(路径为:安装目录\bin)
新建data文件夹,新建logs文件夹
config文件夹:zoo_sample.cfg  新复制一个:zoo.cfg
编辑zoo.cfg文件:
新增(配置路径【一定要配置\\,要不然不识别】:安装路径\\zookeeper\\apache-zookeeper-3.6.4-bin\\):
dataDir= 安装路径\zookeeper\apache-zookeeper-3.6.4-bin\data
dataLogDir=安装路径\zookeeper\apache-zookeeper-3.6.4-bin\log
audit.enable=truezookeeper/conf/zoo.cfg 参数详解

tickTime=2000:
        这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳,单位是毫秒
        
initLimit=10:
        这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒,10秒内要启动集群并出现leader和floower。
syncLimit=5:
        这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒,超出时间认为是死机。
dataDir:
        快照日志的存储路径
dataLogDir:
        事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
clientPort=12181:
        这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

启动zkServer

启动目录:\bin\zkServer.sh

启动命令:zkServer.sh start

查找看到:binding to port .0.0.0.0/0.0.0.2181能表示成功

安装Scala

下载地址:https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.msi
安装:一键安装(一直next,直到完成)
配置环境变量(这个需要配置):安装目录/bin
判断是否安装完成:scala -version

Kafka安装

下载地址:https://kafka.apache.org/downloads
Kafka安装目录下新建目录logs
编辑config\server.properties文件
log.dirs=安装目录\\logs(注意双斜线,如果是cmd命令出现命令行太长,那就把Kafka安装安装在磁盘的最外面,D盘的最外层)
新增参数:listeners=PLAINTEXT://localhost:9092

启动

一定要先启动zookeeper(命令:zkServer)
然后启动kafka(命令(cmd进入Kafka安装目录):.\bin\windows\kafka-server-start.bat .\config\server.properties)
查找看到:from now on will use node localhost:9092
能表示成功(如果启动不了,删除logs文件夹下的文件)

操作:

创建topics(主题):

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test


查看主题:

kafka-topics.bat --bootstrap-server localhost:9092 --list

生产者:

cmd进入:安装目录\bin\windows
打开生产者:kafka-console-producer.bat --broker-list localhost:9092 --topic test

消费者:

cmd进入:安装目录\bin\windows
打开消费者:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

测试:

生产者发送消息,消费者订阅收到消息

注意:
1.一共打开四个窗口,1.zookeeper 2.kafka  3.生产者  4.消费者  (注意,这四个窗口不能关闭,要一直开着)
2.一定是生产者生产消息,消费者才会收到消息(注意,生产者和消费者的topics一定要是一样的,要不然收不到消息)

方法

getOutQlen方法

使用方法:$producer->getOutQLen();
作用:
1.用于获取生产者(Producer)内部队列中等待发送到Kafka broker的消息数量。
2.getOutQLen() 方法允许你查询这个内部队列中当前待发送的消息数量。通常用于监控和调试目的,帮助了解生产者的发送速率和队列积压情况
3.getOutQLen() 返回的是近似值,它可能在调用之间发生变化
输出数据:
int(0)


poll方法

使用方法:
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}
作用:
用于从Kafka集群中拉取消息,当消费者调用poll()方法时,如果在规定的时间内没有收到任何消息,它会立即返回,并且没有任何消息被拉取到(轮询一次就相当于拉取一定时间段broker中可消费的数据)


 
flush方法

使用方法:$producer->flush(10000);
作用:将生产者内部缓冲区中的消息强制发送到Kafka broker的过程。


consumerstart方法

使用方法:$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
作用:在Kafka中,consumerstart方法是用于启动消费者线程并开始从Kafka集群中拉取消息的方法。


Consume方法

使用方法:$topic->consume(0, 120*10000);
作用:是指Kafka消费者从Kafka集群中读取消息的过程。首先需要从集群中先拉取数据


Purge方法

使用方法:$producer->purge(RD_KAFKA_PURGE_F_QUEUE);
作用:是指清除已完成或已过期的请求,以释放缓存资源。


initTransactions方法

作用:用于初始化一个事务。Kafka从0.11.0.0版本开始支持事务性生产者API,允许生产者将多个消息组合成一个事务,确保这些消息被原子性地写入Kafka。这意味着要么所有消息都成功写入,要么都不写入,保证了消息的一致性


beginTransaction方法

作用:用于开始一个新的生产者事务。Kafka从0.11.0.0版本开始支持事务性生产者API,它允许你将多个消息组合成一个事务,确保这些消息被原子性地写入Kafka。这意味着要么所有消息都成功写入,要么都不写入,这保证了消息的一致性


commitTransaction方法

作用:它用于提交一个事务。当你使用 Kafka 的事务性生产者 API 时,你可以将一系列的消息发送操作组合成一个原子性的事务。这意味着这些操作要么全部成功,要么全部失败,从而确保数据的一致性和顺序性


abortTransaction方法

作用:中止事务,当发送消息或提交事务过程中发生错误时使用

getMetadata方法

使用方法:$producer->getMetadata(false, $topic, 10*1000);

作用:
用于获取Kafka集群的元数据
获取数据包括:1.主题(topics)2.分区(partitions)3.副本(replicas)4.ISR(In-Sync Replicas)等信息。
通常,客户端库(如PHP的php-kafka)会在初始化时或需要时自动执行此操作,以便了解集群的状态和可用主题。


代码:

class Kafka extends CI_Controller {//定义变量(分区)private $borker_list = "";//定义变量(配置)private $conf = "";//定义变量(主题)private $topics = "";//定义变量(分组)private $topics_group = "";//构造public function __construct(){parent::__construct();//初始化数据$this->borker_list = "localhost:9092";$this->topics = "test";$this->topics_group = "test-group";}//消息生产者public function producter(){//初始化$conf=  new RdKafka\Conf();//设置分区$conf->set('metadata.broker.list', $this->borker_list);//初始化生产者$producer = new RdKafka\Producer($conf);//设置主题$topic = $producer->newTopic($this->topics);//产生信息for ($i = 0; $i < 10; $i++) {$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");$producer->poll(0);}//消息刷新for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {$result = $producer->flush(10000);if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {break;}}//刷新结果if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Was unable to flush, messages might be lost!');}}//消息订阅者public function consumer(){set_time_limit(0);//初始化$conf = new RdKafka\Conf();//设置分区$conf->set('metadata.broker.list', $this->borker_list);$conf->set('group.id',$this->topics_group);//初始化消费者$rk = new RdKafka\Consumer($conf);//主题配置$topicConf = new RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms', 100);$topicConf->set('offset.store.method', 'file');$topicConf->set('offset.store.path', sys_get_temp_dir());$topicConf->set('auto.offset.reset', 'smallest');$topic = $rk->newTopic($this->topics, $topicConf);$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);while (true) {$message = $topic->consume(0, 120*10000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR://没有错误打印信息var_dump($message);break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "等待接收信息\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "超时\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}}//获取元数据(包括主题(topics)、分区(partitions)、副本(replicas)和ISR(In-Sync Replicas)等信息)public function gettest(){//初始化$conf=  new RdKafka\Conf();//设置分区$conf->set('metadata.broker.list', $this->borker_list);//初始化生产者$producer = new RdKafka\Producer($conf);$topic = $producer->newTopic($this->topics);
//        $result = $producer->getMetadata(false, $topic, 10*1000);$result = $producer->getOutQLen();var_dump($result);die;}//获取元数据public function metadata(){$conf = new RdKafka\Conf();$conf->setDrMsgCb(function ($kafka, $message) {file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);});$conf->setErrorCb(function ($kafka, $err, $reason) {printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);});$conf->set('group.id', 'myConsumerGroup');$rk = new RdKafka\Consumer($conf);$rk->addBrokers("127.0.0.1");$allInfo = $rk->getMetadata(true, NULL, 60e3);$topics = $allInfo->getTopics();//循环输出foreach ($topics as $topic) {$topicName = $topic->getTopic();if ($topicName == "__consumer_offsets") {continue ;}$partitions = $topic->getPartitions();foreach ($partitions as $partition) {$topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/11176.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

等保一体机能过三级等保吗?过等保无需再买安全设备如何做到?

等保一体机能过三级等保吗&#xff1f;过等保无需再买安全设备如何做到&#xff1f; 全云在线 2024-03-28 12:08 广东 尽管等保建设的标准是统一的&#xff0c;但由于不同行业和用户规模的差异&#xff0c;建设方案呈现出多样化的特点。 虽然重点行业过等保现象确实已经十分…

免费思维13招之八:跨行业思维

免费思维13招之八:跨行业思维 免费思维的另一大战略思维——跨行业型思维。 跨行业型思维有两种:一种是通过跨行业,把自己的产品免费掉,从而赚取其他行业的利润。另一种是通过跨行业,把别人的主流产品免费掉,从而增大自己产品的销量。 第一种,把自己的产品免费,从而赚…

Hadoop3.4.0 完全分布式集群 运行环境搭建 VMware Workstation 虚拟机 大数据系列 一

一 生产环境集群模式部署&#xff0c;需要多台主机&#xff0c;主机之间通过密钥相互访问. 1 配置如图 节点名字节点IP系统版本master11192.168.50.11centos 8.5slave12192.168.50.12centos 8.5slave13192.168.50.13centos 8.5 2 安装服务器 #先安装一台master11&#xff…

Hadoop-未授权访问-内置配合命令执行RCE

一、Hadoop常见端口及配置文件 Hadoop中有多种端口&#xff0c;这些端口用于不同的服务和通信。以下是Hadoop中常见的端口以及它们的用途&#xff1a; NameNode Web界面端口 (默认: 9870)NameNode 对客户端服务端口 (默认: 8020)Secondary NameNode Web界面端口 (默认: 9868)…

Ubuntu20.4部署Cuda12.4

准备Ubuntu20.4 VM 安装Cuda12.4 1.进入如下界面安装安装Cuda12.4版本&#xff1a; CUDA Toolkit 12.4 Update 1 Downloads | NVIDIA Developerhttps://developer.nvidia.com/cuda-downloads?target_osLinux&target_archx86_64&DistributionUbuntu&target_vers…

MySQL软件安装基于压缩包

打开mysql官网网址 MySQL :: Download MySQL Community Server 本次针对版本8的安装包方式进行安装&#xff0c;下载成功后接下来对MySQL进行安装 下载后有一个以zip后缀结尾的压缩包文件 对于安装包方式安装&#xff0c;比起可视化安装省去了许多安装步骤&#xff0c;这里直接…

Python | Leetcode Python题解之第86题分隔链表

题目&#xff1a; 题解&#xff1a; class Solution:def partition(self, head: Optional[ListNode], x: int) -> Optional[ListNode]:sml_dummy, big_dummy ListNode(0), ListNode(0)sml, big sml_dummy, big_dummywhile head:if head.val < x:sml.next headsml sm…

C++ | Leetcode C++题解之第86题分隔链表

题目&#xff1a; 题解&#xff1a; class Solution { public:ListNode* partition(ListNode* head, int x) {ListNode* small new ListNode(0);ListNode* smallHead small;ListNode* large new ListNode(0);ListNode* largeHead large;while (head ! nullptr) {if (head-…

安卓模拟器怎么修改ip地址

最近很多老铁玩游戏的&#xff0c;想多开模拟器一个窗口一个IP&#xff0c;若模拟器窗口开多了&#xff0c;IP一样会受到限制&#xff0c;那么怎么更换自己电脑手机模拟器IP地址呢&#xff0c;今天就教大家一个修改模拟器IP地址的方法&#xff01;废话不多说&#xff0c;直接上…

摩苏尔大坝形变监测

摩苏尔大坝&#xff0c;是伊拉克最大的大坝。它位于底格里斯河35公里&#xff0c;北距摩苏尔市&#xff0c;这是一座粘土质地的水坝&#xff0c;高113米&#xff0c;长3.2公里&#xff0c;于1986落成。 大坝建成后不久&#xff0c;大坝就遇到了由软石膏地基造成的一些结构性问题…

代码随想录算法训练营第二十七天| LeetCode39. 组合总和、LeetCode40.组合总和II、LeetCode131.分割回文串

#LeetCode 39. Combination Sum #LeetCode 39. 视频讲解&#xff1a;带你学透回溯算法-组合总和&#xff08;对应「leetcode」力扣题目&#xff1a;39.组合总和&#xff09;| 回溯法精讲&#xff01;_哔哩哔哩_bilibili 当建立树的结构的时候&#xff0c;target 可以限制树的深…

第四届上海理工大学程序设计全国挑战赛 J.上学 题解 DFS 容斥

上学 题目描述 usst 小学里有 n 名学生&#xff0c;他们分别居住在 n 个地点&#xff0c;第 i 名学生居住在第 i 个地点&#xff0c;这些地点由 n−1 条双向道路连接&#xff0c;保证任意两个地点之间可以通过若干条双向道路抵达。学校则位于另外的第 0 个地点&#xff0c;第…

python多标签图像分类的图片相册共享交流系统vue+django

建立图片共享系统&#xff0c;进一步提高用户对图片共享信息的查询。帮助用户和管理员提高工作效率&#xff0c;实现信息查询的自动化。使用本系统可以轻松快捷的为用户提供他们想要得到的图片共享信息。 根据本系统的基本设计思路&#xff0c;本系统在设计方面前台采用了pytho…

Python 数据处理 合并二维数组和 DataFrame 中特定列的值

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 示例代码如下&#xff1a; import numpy as np import pandas as pddata {label: [1, 2, 3, 4]} df pd.DataFrame(data)values_array df[["label"]].values random_array np.random.ran…

Star15.3k,开源数据可视化分析工具项目

好东西来了&#xff0c;这是一个人人可用的开源数据可视化分析工具项目&#xff0c;V 哥迫不及待的要给大家推荐这个项目&#xff0c;帆软、Tableau 等商业 BI 工具的开源替代&#xff0c;已在 Github 上被 Star了15.3k了&#xff0c;大家一起来了解一下。自己搭建起来可用&…

【Git】Git在Gitee上的基本操作指南

文章目录 1. 查看 git 版本2. 从Gitee克隆仓库&#xff1a;3. 复制文件到工作目录&#xff1a;4. 将未跟踪的文件添加到暂存区&#xff1a;5. 在本地提交更改&#xff1a;6. 将更改推送到远程仓库&#xff08;Gitee&#xff09;&#xff1a;7. Windows特定提示&#xff1a; 1. …

数智化浪潮下,透视中建信息的“生态底色”

数智化浪潮正滚滚来袭&#xff0c;成为强劲的经济引擎。 根据国家统计局最新披露的数据&#xff0c;2023年中国数字经济核心产业增加值估计会超过12万亿元&#xff0c;占GDP比重10%左右。 对于众多企业而言&#xff0c;眼下思考的已经不是要不要布局数智化&#xff0c;而是如…

Golang | Leetcode Golang题解之第85题最大矩形

题目&#xff1a; 题解&#xff1a; func maximalRectangle(matrix [][]byte) (ans int) {if len(matrix) 0 {return}m, n : len(matrix), len(matrix[0])left : make([][]int, m)for i, row : range matrix {left[i] make([]int, n)for j, v : range row {if v 0 {continu…

Disk Map for Mac,让您的Mac更“轻”松

还在为Mac磁盘空间不足而烦恼吗&#xff1f;Disk Map for Mac来帮您轻松解决&#xff01;通过独特的TreeMap视觉显示技术&#xff0c;让您一眼就能看出哪些文件和文件夹占用了大量空间。只需简单几步操作&#xff0c;即可快速释放磁盘空间&#xff0c;让您的Mac更“轻”松。快来…

整体安全设计

人员和资产的安全是当今许多组织的最高优先事项之一。随着暴力事件在美国各地盛行——枪击事件、袭击、内乱等——建筑物业主必须为其建筑物及其居住者的安全做好计划。 为了创造一个安全的环境&#xff0c;新设施或园区的安全设计必须超越基本的摄像头和访问控制设备&#xf…