Beanstalked的初步了解和使用(包括利用beanstalkd 秒杀消息队列的实现)

一  Beanstalkd 是什么

Beanstalkd,一个高性能、轻量级的分布式内存队列系统

 

二  Beanstalkd 特性

1. 优先级(priority)

注:优先级就意味 支持任务插队(数字越小,优先级越高,0的优先级最高)

2. 延迟(delay)

注:延迟意味着可以定义任务什么时间才开始被消费,也就实现了定时任务(比如为了增加网站活跃性,增加定时评论,定时点赞功能)

3. 持久化(persistent data)

注:Beanstalkd 支持定时将文件刷到日志文件里,即使beanstalkd宕机,重启之后仍然可以找回文件

4. 预留(buried)

注:Beanstalkd支持把一个任务设置为预留,这样,消费者就无法取出这个任务了,等合适的时机再把这个任务拿出来消费

5. 任务超时重发(time-to-run)

注:消费者必须在指定的时间内处理完这个任务,否则就认为消费者处理失败,任务会被重新放到队列,等待消费


三 管道(tube)与任务(job)

注:生产者生产任务,并根据业务需求将任务放到不同管道中,比如和注册有关的任务放到注册管道中,和订单有关的放到订单管道中

注:任务从进入管道到离开管道一共有5个状态(ready,delayed,reserved,buried,delete)

1. 生产者将任务放到管道中,任务的状态可以是ready(表示任务已经准备好,随时可以被消费者读取),也可以是delayed(任务在被生产者放入管道时,设置了延迟,比如设置了5s延迟,意味着5s之后,这个任务才会变成ready状态,才可以被消费者读取)

2. 消费者消费任务(消费者将处于ready状态的任务读出来后,被读取处理的任务状态变为reserved)

3. 消费者处理完任务后,任务的状态可能是delete(删除,处理成功),可能是buried(预留,意味着先把任务放一边,等待条件成熟还要用),可能是ready,也可能是delayed,需要根据具体业务场景自己进行判断定义


具体示意图:







四 Beanstalkd 的安装(git、方式)

注:beanstalkd是不支持windows的 ,必须要在Linux环境下(本地环境介绍 Linux, php7,mysql5.6,nginx,centos7.4) 

1. yum install -y git 

2.  git clone https://github.com/kr/beanstalkd 

3.  cd beanstalkd 

4. make

5. make install

6. 查看安装是否成功




五 开始使用Beanstalkd

1. 绑定地址和端口

beanstalkd -l 127.0.0.1 -p 11301 &


2. composer安装pheanstalkd类(这个类在php使用中会简化很多操作,非常方便)

--------------

2.1 安装composer     curl -sS https://getcomposer.org/installer | php

2.2 将composer全局调用     mv composer.phar /usr/local/bin/composer

2.3  cd /usr/local/bin

2.4 chmod +x composer

-----------------------

2.5 composer require pda/pheanstalk



注:pheanstalkd 用法示例:https://github.com/pda/pheanstalk

3. 简单使用

环境介绍(阿里云Linux服务器, 本地win10,Filezilla用于双方文件传输 ,xshell用于连接远程阿里云服务器)

3.1  编写demo.php

<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$ph = new Pheanstalk('127.0.0.1',11301);

print_r($ph->stats());//查看目前pheanStalkd状态信息

3.2  将在本地编写的demo.php代码通过filezilla上传到阿里云服务器 /usr/local/beanstalkd/beanstalkd下,并在linux下运行


https://github.com/kr/beanstalkd.git
https://github.com/kr/beanstalkd.git


4. pheanstalkd类常用的方法

<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$ph = new Pheanstalk('127.0.0.1',11301);

//----------------------------------------维护类----------------------------------

//1.查看目前pheanStalkd状态信息
//print_r($ph->stats()); 

//2.显示目前存在的管道
//print_r($ph->listTubes()); 

//3.查看NewUsers管道的信息
//$ph->useTube('NewUsers')->put('test'); 
//$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道添加一个up任务
//print_r($ph->statsTube('NewUsers'));//3.查看NewUsers管道的信息

//6.查看指定管道中某一个任务的情况
//$job = $ph->watch('NewUsers')->reserve(); //5.从管道中取出任务(消费)
//print_r($ph->statsJob($job)); //6.查看指定管道中某一个任务的情况

//7.查看任务id为1的任务详情
//$job = $ph->peek(1);7.直接取出任务id为1的任务 [注:beanstalkd中所有任务的id都具有唯一性] 
//print_r($ph->statsJob($job));//查看任务id为1的任务详情


//----------------------------------------生产类--------------------------------------

第一种 put()

//$tube = $ph->useTube('NewUsers');//连接NewUsers管道
//print_r($tube->put('four'));//向NewUsers管道添加任务four,并返回结果
//注: put()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr)

第二种 putInTube() [注: putInTube()就是对useTube()和put()的封装]
//$res = $ph->putInTube('NewUsers','three');//向NewUsers管道添加任务three
注: putInTube()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr)
//print_r($res);//返回任务id

//print_r($ph->statsTube('NewUsers'));//查看NewUsers管道的详细情况


//---------------------------------------消费类--------------------------------------

// 1.watch 监听NewUsers管道 [ 注: watch()同样可以监听多个管道 ]
//$tube = $ph->watch('NewUsers');
//print_r($ph->listTubesWatched());//打印已经监听的管道


// 2.watch 监听多个管道
//$tube = $ph->watch('NewUsers')
//           ->watch('default');
//print_r($ph->listTubesWatched());//打印已经监听的管道


// 3.ignore 监听NewUsers管道,忽略default管道
//$tube = $ph->watch('NewUsers')
//            ->ignore('default');
//print_r($ph->listTubesWatched());//打印已经监听的管道


// 4.reserve 监听NewUsers管道,并且取出任务
//$job = $ph->watch('NewUsers')
//          ->reserve();
//
注reserve()有1个参数,阻塞的时间,过了阻塞时间,不管有没有东西,直接返回
//
//var_dump($job);//打印已经取出的任务
//$ph->delete($job);//删除已经取出的任务


// 5.putInTube/put 向NewUsers管道写入任务 [ 注:此为生产者方法,放到此处是为了方便理解 ]
//$ph->putInTube('NewUsers','number_1',5);
//$ph->putInTube('NewUsers','number_2',3);
//$ph->putInTube('NewUsers','number_3',0);
//$ph->putInTube('NewUsers','number_4',4);
//print_r($ph->statsTube('NewUsers'));//5.查看NewUsers管道详细信息


// 6.release 将取出的任务放回ready状态,还有2个参数(优先级和延迟)
//$job = $ph->watch('NewUsers')->reserve();//6.监听NewUsers管道,并取出任务

//if (true) {
//    sleep(30);
//    $ph->release($job);//6.将任务取出之后,停留30秒,然后将任务状态重新变为ready
//} else {
//    $ph->delete($job);
//}


// 7.bury (预留) 将任务取出之后,发现后面执行的逻辑不成熟(比如发邮件,突然发现邮件服务器挂掉了),
//或者说还不能执行后面的逻辑,需要把任务先封存起来,等待时机成熟了,再拿出这个任务进行消费

//$job = $ph->watch('NewUsers')->reserve();//取出任务
//$ph->bury($job);//取出任务后,将任务放到一边(预留)

// 8.peekBuried() 将处在bury状态的任务读取出来
//$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来
//var_dump($ph->statsJob($job));//打印任务状态(此时任务状态应该是bury)

// 9.kickJob() 将处在bury任务状态的任务转化为ready状态
//$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来
//$ph->kickJob($job);

// 10.kick()  将处在bury任务状态的任务转化为ready状态,有第二个参数int, 批量将任务id小于此数值的任务转化为ready
//$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任务id小于65,并且任务状态处于bury的任务全部转化为ready

// 11.peekReady() 将管道中处于ready状态的任务读出来
//$job = $ph->peekReady('NewUser');//将NewUser管道中处于ready状态的任务读取出来
//var_dump($job);
//$ph->delete($job);

// 12.peekDelay() 将管道中所有处于delay状态的任务读取出来
//$job = $ph->peekDelayed('NewUser');
//var_dump($job);
//$ph->delete($job);


// 13.pauseTube() 对整个管道进行延迟设置,让管道处于延迟状态
//$ph->pauseTube('NewUser',10);//设置管道NewUser延迟时间为10s
//$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务
//var_dump($job);

// 14.resumeTube() 恢复管道,让管道处于不延迟状态,立即被消费
//$ph->resumeTube('NewUser');//取消管道NewUser的延迟状态,变为立即读取
//$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务
//var_dump($job);

// 15.touch() 让任务重新计算任务超时重发ttr时间,相当于给任务延长寿命


5. 项目中的应用总结

生产者中常用的方法

useTube() : 如果没有管道,则创建对应管道,有,则直接使用

put() : 向管道中放任务

消费者中常用的方法步骤:

1. watch():监听管道

2. reserve():将管道中处于ready状态的任务读取出来

3.1 可以使用delete 方法删除任务

3.2 可以使用release 方法将任务放回ready状态

3.3 可以使用bury 方法将任务先放一边(例如发邮件,邮箱服务器挂掉),等待条件成熟再取出来


6. 实际应用演示

producer.php

<?php
require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$ph = new Pheanstalk('127.0.0.1',11301);

$ph->useTube('List')->put('goods');
$ph->useTube('List')->put('goods2');
$ph->useTube('List')->put('goods3');
$ph->useTube('List')->put('goods4');
$ph->useTube('List')->put('goods5');

//print_r($ph->statsTube('List'));//查看List管道的信息

consumer.php

<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$ph = new Pheanstalk('127.0.0.1',11301);

$res = $ph->watch('List')->reserve();//监听List管道,并将任务取出来

if ($res) {$ph->delete($res);
    var_dump($res);
}

注:

生产者根据业务不同,将任务放到不同管道(管道用于存储消费者生产的任务),比如将和注册有关的任务通通放到注册管道,和订单有关的通通放入订单管道

消费者将处于ready状态的任务根据优先级逐个读取出来


五  使用Beanstalkd 实现类似redis秒杀活动

producer.php代码:
<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

//连接beanstalkd
$ph = new Pheanstalk('127.0.0.1', 11301);

$tube_name = 'SecKill2';

//使用SecKill2管道
$SEC = $ph->useTube($tube_name);

//模拟100人请求秒杀
for ($i = 0; $i < 100; $i++) {$uid = rand(10000000, 99999999);
    //获取当前队列已经拥有的数量,如果人数少于十,则加入这个队列
    $total_jobs = $ph->statsTube($tube_name)['total-jobs'];
    $num = 10;
    if ($total_jobs < $num) {$SEC->put($uid);//向管道放任务
        echo $uid . "秒杀成功";
    } else {//如果当前队列人数已经达到10人,则返回秒杀已完成
        echo "秒杀已结束<br>";
    }
}
print_r($ph->statsTube($tube_name));//查看SecKill2管道的信息

注: 执行完producer.php代码后,应当会在SecKill2管道中看到10个任务,每一个任务内容是一个uid




consumer.php代码:

<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

//连接BeanStalkd队列系统
$ph = new Pheanstalk('127.0.0.1',11301);
$tube_name = 'SecKill2';
//取出SecKill2管道的任务总数
$total_jobs = $ph->statsTube($tube_name)['total-jobs'];

//PDO连接mysql数据库
$dsn = "mysql:dbname=test;host=127.0.0.1";
$pdo = new PDO($dsn, 'root', '123456');

//循环取出管道中任务,并执行插入数据库操作
for($i = 0; $i < $total_jobs; $i++){//监听SecKill4管道,并将任务取出来
    $job = $ph->watch($tube_name)->reserve();

    //取出任务存储的值uid
    $uid = $job->getData();//打印出的样子 string(8) "24541944"

    if (!$uid) {sleep(2);
        continue;
    }//生成订单号
    $orderNum = build_order_no($uid);
    //生成订单时间
    $timeStamp = time();
    //构造插入数组
    $user_data = array('uid'=>$uid,'time_stamp'=>$timeStamp,'order_num'=>$orderNum);
    //将数据保存到数据库
    $sql = "insert into seckill (uid,time_stamp,order_num) values (:uid,:time_stamp,:order_num)";
    $stmt = $pdo->prepare($sql);
    $res = $stmt->execute($user_data);
    //如果数据库操作成功,则删除任务
    if ($res) {$ph->delete($job);
    }}//生成唯一订单号
function build_order_no($uid){return  substr(implode(NULL, array_map('ord', str_split(substr(uniqid(), 7, 13), 1))), 0, 8).$uid;
}

注: 执行完consumer.php文件之后, 数据表应该已经写入了10个订单


注: 

用到的数据表

create table `seckill`(
`uid` int unsigned not null default '0',
`time_stamp` int unsigned not null  default '0',
`order_num` bigint unsigned not null default '0',
primary key (`uid`),
key (order_num)
)engine = myisam default charset= utf8;

参考视频地址: https://www.imooc.com/video/16016

这边博客可能会用到 https://blog.csdn.net/ahjxhy2010/article/details/53196450



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/282393.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPF效果第二百篇之再玩Gamma曲线

前面效果中使用比较low的方式实现了2.4的Gamma曲线;虽说后面加了点动画呈现效果,但也就是个过渡版;今天才基本符合需求的效果:1、还是基于WPF效果第一百七十八篇之贝塞尔曲线他来实现的:3个ListBox 3个LandmarkControl2、在LandmarkControl增加插点位事件View:LandmarkControl …

skynet源码阅读5--协程调度模型

注&#xff1a;为方便理解&#xff0c;本文贴出的代码部分经过了缩减或展开&#xff0c;与实际skynet代码可能会有所出入。 作为一个skynet actor&#xff0c;在启动脚本被加载的过程中&#xff0c;总是要调用skynet.start和skynet.dispatch的&#xff0c;前者在skynet-os中…

ASP.NET Core GRPC 和 Dubbo 互通

一.前言Dubbo 是比较流行的服务治理框架&#xff0c;国内不少大厂都在使用。以前的 Dubbo 使用的是私有协议&#xff0c;采集用的 hessian 序列化&#xff0c;对于多语言生态来说是极度的不友好。现在 Dubbo 发布了新版本 v3&#xff0c;推出了基于 gRPC 的新协议 Triple&#…

详解C# 迭代器

[引用&#xff1a;https://www.cnblogs.com/yangecnu/archive/2012/03/17/2402432.html] 迭代器模式是设计模式中行为模式(behavioral pattern)的一个例子&#xff0c;他是一种简化对象间通讯的模式&#xff0c;也是一种非常容易理解和使用的模式。简单来说&#xff0c;迭代器模…

利用redis List队列简单实现秒杀 PHP代码实现

一 生产者producer部分 --------------------------------producer 部分注释------------------------------------------------------------ 用户在页面请求之后, 获取到用户uid , 跳转到这个加入队列的方法 (这里直接在producer中模拟了多个uid) 在方法内部判断redis队列长…

使用Filezilla 与 linux远程服务器传输文件时,设置默认打开编辑器

1. 点击编辑 2. 选择设置&#xff0c;点击文本编辑 3. 设置编辑器目录 4. 确定作用&#xff1a; 这样设置之后&#xff0c;可以实现在远程站点栏直接下载并使用phpstorm编辑的作用 正常需要下载之后&#xff0c;再去本地相应下载目录打开&#xff0c;然后再进行上传文件&#x…

在 .NET 中使用 FluentValidation 进行参数验证

不用说&#xff0c;参数验证很重要&#xff0c;无效的参数&#xff0c;可能会导致程序的异常。如果使用Web API或MVC页面&#xff0c;那么可能习惯了自带的规则验证&#xff0c;我们的控制器很干净&#xff1a;public class User {[Required]public string FirstName { get; se…

在win10系统下怎样快速切换任务视图

2019独角兽企业重金招聘Python工程师标准>>> 切换窗口&#xff1a;Alt Tab 任务视图&#xff1a;Win Tab (松开键盘界面不会消失) 切换任务视图&#xff1a;Win Ctrl 左/右 创建新的虚拟桌面&#xff1a;Win Ctrl D 关闭当前虚拟桌面&#xff1a;Win Ctrl F4…

Linux上搭建Samba,实现windows与Linux文件数据同步

一 环境介绍 1. 本地win10 2. Linux (centos7.4) 注&#xff1a;因为运营商方面禁止smb协议&#xff0c;导致无法在云服务器上使用smb&#xff0c;如果不是在虚拟机上操作&#xff0c;而是在云服务器上操作&#xff0c;建议还是使用 filezillaxshell组合 或者 使用finalshell等…

A5-1和DES两个加密算法的学习

A5-1加密算法 1、基本原理 A5-1加密算法是一种流password&#xff0c;通过密钥流对明文进行加密。然后用密钥流进行对密文的解密操作。 这样的算法主要用于GSM加密。也就是我们平时打电话的时候。通信数据发送到基站&#xff0c;基站发送到还有一个基站&#xff0c;基站发送到接…

从0到1简易区块链开发手册V0.3-数据持久化与创世区块

Author: brucefeng Email: brucefengbrucefeng.com 编程语言:Golang 1.BoltDB简介 Bolt是一个纯粹Key/Value模型的程序。该项目的目标是为不需要完整数据库服务器&#xff08;如Postgres或MySQL&#xff09;的项目提供一个简单&#xff0c;快速&#xff0c;可靠的数据库。 Bolt…

ELK之elasticsearch5.6的安装和head插件的安装

这里选择的elasticsearch为5.6的新版本&#xff0c;根据官方文档有几种暗装方式&#xff1a; https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html 这里选择rpm包安装https://www.elastic.co/guide/en/elasticsearch/reference/curre…

Nginx 基础(一)

一 、Nginx简述 Nginx是一个开源、高性能、可靠的HTTP中间件、代理服务。二 、常见的HTTP服务 1. HTTPD-Apache基金会 2. IIS-微软 3. GWS-Google 4. Nginx三、为什么选择Nginx 原因一&#xff1a;IO多路复用epoll &#xff08;主要解决了并发性的问题&#xff09; 注1&#xf…

ASP.NET Core高性能服务器HTTP.SYS

如果我们只需要将ASP.NET CORE应用部署到Windows环境下&#xff0c;并且希望获得更好的性能&#xff0c;那么我们选择的服务器类型应该是HTTP.SYS。Windows环境下任何针对HTTP的网络监听器/服务器在性能上都无法与HTTP.SYS比肩。[本文节选《ASP.NET Core 6框架揭秘》第18章]一、…

Nginx 基础 ( 二)

一、HTTP请求 http请求包括客户端请求服务端 以及 服务端响应数据回客户端&#xff0c;如下 请求&#xff1a;包括请求行、请求头部、请求数据 响应&#xff1a;包括状态行、消息报头、响应正文 比如在Linux中curl请求网站获取请求信息和响应信息 curl -v http://www.kugou.com…

《金融行业应用解决方案白皮书》发布,金融自主创新未来可期!

日前&#xff0c;以“聚势赋能 行业共创”为主题的金融行业解决方案发布会在线上举行。麒麟软件发布《金融行业应用解决方案白皮书》&#xff0c;并发起成立“金融机具生态圈俱乐部”&#xff0c;助力金融行业用户高质量发展。金融信息系统曾经被国外厂商垄断金融信息系统作为国…

leetcode53 Maximum Subarray 最大连续子数组

题目要求 Find the contiguous subarray within an array (containing at least one number) which has the largest sum.For example, given the array [-2,1,-3,4,-1,2,1,-5,4], the contiguous subarray [4,-1,2,1] has the largest sum 6.即&#xff1a;寻找数列中的一个子…

详解go语言的array和slice 【二】

上一篇 详解go语言的array和slice 【一】已经讲解过,array和slice的一些基本用法&#xff0c;使用array和slice时需要注意的地方&#xff0c;特别是slice需要注意的地方比较多。上一篇的最后讲解到创建新的slice时使用第三个索引来限制slice的容量&#xff0c;在操作新slice时…

详解Objective-C的meta-class

2019独角兽企业重金招聘Python工程师标准>>> 比较简单的一篇英文&#xff0c;重点是讲解meta-class。翻译下&#xff0c;加深理解。 原文标题&#xff1a;What is a meta-class in Objective-C? 原文地址&#xff1a;http://www.cocoawithlove.com/2010/01/what-is…

十倍程序员 | 使用 Source Generator 将 JSON 转换成 C# 类

前言有时候&#xff0c;我们需要将通过 WebAPI 接收 JSON 字符串转换成 C# 代码。Visual Studio 提供了一个功能菜单可以轻松实现&#xff1a;执行完成后&#xff0c;它会将生成的代码放在打开的的代码窗口中。但是&#xff0c;如果有多个 JSON 字符串需要转换&#xff0c;这个…