laravel实现AMQP(rabbitmq)生产者以及消费者

基于php-amqplib/php-amqplib组件适配laravel框架的amqp封装库

支持便捷可配置的队列工作模式 官网详情

在此基础上可支持延迟消息、死信队列等机制。

环境要求:

PHP版本: ^7.3|^8.0

需要开启的扩展: socket

其他:

  1. 如果需要实现延迟任务需要安装对应版本的rabbitmq延迟插件,以rabbitmq3.9.0版本为例:
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

用法:

第一步 安装组件:

composer require sai97/laravel-amqp

第二步 发布服务以及配置:

php artisan vendor:publish --provider="Sai97\LaravelAmqp\AmqpQueueProviders"

执行完后会分别在app/config目录下生成amqp.php(amqp连接配置等)、app/QueueJob/DefaultQueueJob.php(默认队列任务)

amqp.php

<?phpuse App\QueueJob\DefaultQueueJob;return ["connection" => ["default" => ["host" => env("AMQP_HOST", "127.0.0.1"),"port" => env("AMQP_PORT", 5672),"user" => env("AMQP_USER", "root"),"password" => env("AMQP_PASSWORD", "root")]],"event" => ["default" => DefaultQueueJob::class,]
];

connection为amqp连接配置,可根据自身业务去调整,完全对应php-amqplib/php-amqplib相关配置项, event是队列实例标识,最好和connection用相同的key以便管理。

目前可支持相关接口项:

 //获取连接名称public function getConnectName(): string;//获取交换机名称public function getExchangeName(): string;//获取交换机类型public function getExchangeType(): string;//获取队列名称public function getQueueName(): string;//获取路由KEYpublic function getRoutingKey(): string;//获取ContentTypepublic function getContentType(): string;//是否开启死信模式public function isDeadLetter(): bool;//获取死信交换机名称public function getDeadLetterExchangeName(): string;//获取死信路由KEYpublic function getDeadLetterRoutingKey(): string;//获取死信队列名称public function getDeadLetterQueueName(): string;//是否开启延迟任务public function isDelay(): bool;//获取延迟任务过期时长public function getDelayTTL(): int;//获取队列附加参数public function getQueueArgs(): array;//获取回调函数public function getCallback(): callable;//是否自动提交ACKpublic function isAutoAck(): bool;

当然你也可以自定义队列实例,只要继承Sai97\LaravelAmqp\Queue基类即可,具体功能配置参数参考Sai97\LaravelAmqp\QueueInterface。

代码示例:

生产者:

$message = "This is message...";
$amqpQueueServices = new AmqpQueueServices(QueueFactory::getInstance(DefaultQueueJob::class));
$amqpQueueServices->producer($message);

消费者:

利用laravel自带的Command去定义一个RabbitMQWorker自定义命令行,仅需要定义一次,后续只需要更改amqp.php配置文件添加不同的队列实例绑定关系即可,以下是RabbitMQWorker演示代码:

<?phpnamespace App\Console\Commands;use Illuminate\Console\Command;
use Sai97\LaravelAmqp\AmqpQueueServices;
use Sai97\LaravelAmqp\QueueFactory;class RabbitMQWorker extends Command
{/*** The name and signature of the console command.** @var string*/protected $signature = 'rabbitmq:worker {event}';/*** The console command description.** @var string*/protected $description = 'rabbitmq worker 消费进程';/*** Create a new command instance.** @return void*/public function __construct(){parent::__construct();}/*** Execute the console command.** @return mixed*/public function handle(){try {$event = $this->argument("event");$eventConfig = config("amqp.event");if (!isset($eventConfig[$event]) || empty($entity = $eventConfig[$event])) {return $this->error("未知的事件: {$event}");}$this->info("rabbitmq worker of event[{$event}] process start ...");$amqpQueueServices = new AmqpQueueServices(QueueFactory::getInstance($entity));$amqpQueueServices->consumer();} catch (\Throwable $throwable) {$event = $event ?? "";$this->error($throwable->getFile() . " [{$throwable->getLine()}]");return $this->error("rabbitmq worker of event[{$event}] process error:{$throwable->getMessage()}");}$this->info("rabbitmq worker of event[{$event}] process stop ...");}
}

完成RabbitMQWorker消费者命令后,我们只需执行php artisan rabbitmq:worker default 完成监听,其中default是可变的,请根据的amqp.php配置中的队列实例绑定标识去输入。

因为队列的消费者都需要是守护进程,所以我们可以依托supervisord进程管理器去定义RabbitMQWorker消费者命令,这样可以保证进程可后台允许以及重启启动等,以下是supervisord.conf配置文件示例:

[program:rabbitmq-worker-default]
#process_name=%(program_name)s_%(process_num)d
process_name=worker_%(process_num)d
numprocs=3
command=/usr/local/bin/php /app/www/laravel8/artisan rabbitmq:worker default
autostart=true
autorestart=true
startretries=3
priority=3
stdout_logfile=/var/log/rabbitmq-worker-default.log
redirect_stderr=true

搭配supervisord来进行管理消费者进程有许多便捷的方面:

  1. 如果需要新增一个队列实例,只需要按照上述格式复制一个program,可以在不影响其他进程的情况下进程更新supervisord配置:
supervisorctl update

    2. 通过配置numprocs参数来设定需要开启多少个相同配置项的消费者worker,这在任务分发、并行处理等场景十分适用,大大提高消费者执行效率。

这里不详细叙述supervisord相关操作,具体可查看supervisord官方文档。

参考链接:https://github.com/Z-Sai/laravel-amqp

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/51990.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

js中?.、??、??=的用法及使用场景

上面这个错误&#xff0c;相信前端开发工程师应该经常遇到吧&#xff0c;要么是自己考虑不全造成的&#xff0c;要么是后端开发人员丢失数据或者传输错误数据类型造成的。因此对数据访问时的非空判断就变成了一件很繁琐且重要的事情&#xff0c;下面就介绍ES6一些新的语法来方便…

[Stable Diffusion教程] 第一课 原理解析+配置需求+应用安装+基本步骤

第一课 原理解析配置需求应用安装基本步骤 本次内容记录来源于B站的一个视频 以下是自己安装过程中整理的问题及解决方法&#xff1a; 问题&#xff1a;stable-diffusion-webui启动No Python at ‘C:\xxx\xxx\python.exe‘ 解答&#xff1a;打开webui.bat 把 if not de…

“深入探索JVM内部机制:理解Java虚拟机的工作原理“

标题&#xff1a;深入探索JVM内部机制&#xff1a;理解Java虚拟机的工作原理 摘要&#xff1a;本文将深入探索Java虚拟机&#xff08;JVM&#xff09;的内部机制&#xff0c;讲解JVM的工作原理&#xff0c;并通过示例代码帮助读者更好地理解JVM的工作过程。 正文&#xff1a;…

ARM-汇编指令

一&#xff0c;map.lds文件 链接脚本文件 作用&#xff1a;给编译器进行使用&#xff0c;告诉编译器各个段&#xff0c;如何进行分布 /*输出格式&#xff1a;32位可执行程序&#xff0c;小端对齐*/ OUTPUT_FORMAT("elf32-littlearm", "elf32-littlearm",…

StreamingWarehouse的一些思考和未来趋势

300万字&#xff01;全网最全大数据学习面试社区等你来&#xff01; 一篇笔记。 以Hudi、Iceberg、Paimon这几个框架为例&#xff0c;它们支持高效的数据流/批读写、数据回溯以及数据更新。具备一些传统的实时和离线数仓不具备的特性&#xff0c;主要有几个方面&#xff1a; 这…

Day 59

Day 59 503.下一个更大元素II 将两个nums数组拼接在一起&#xff0c;使用单调栈计算出每一个元素的下一个最大值&#xff0c;最后再把结果集即result数组resize到原数组大小就可以了。 class Solution:def nextGreaterElements(self, nums: List[int]) -> List[int]:resu…

自然语言处理从入门到应用——LangChain:索引(Indexes)-[检索器(Retrievers)]

分类目录&#xff1a;《自然语言处理从入门到应用》总目录 检索器&#xff08;Retrievers&#xff09;是一个通用的接口&#xff0c;方便地将文档与语言模型结合在一起。该接口公开了一个get_relevant_documents方法&#xff0c;接受一个查询&#xff08;字符串&#xff09;并返…

(3)、SpringCache源码分析

1、入口说明 @EnableCaching是开启SpringCache的一个总开关,开启时候我们的缓存相关注解才会生效,所以我们@EnableCaching开始作为入口进行分析, 2、分析@EnableCaching注解 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(

Python“牵手”唯品会商品列表数据,关键词搜索唯品会API接口数据,唯品会API接口申请指南

唯品会平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c;唯品会API接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问唯品会平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现唯…

CPU、MCU、MPU、SOC、SOCPC、概念解释之在嵌入式领域常听到的名词含义

CPU、MCU、MPU、SOC等几个在嵌入式领域学习过程中会涉及到的几个名词。我们来学习一下&#xff0c;资料从网上搜集的&#xff0c;有错的地方可以指出。。。 CPU、MCU、MPU、SOC、SOCPC、 1. CPU2. MPU3.MCUMPU和MCU的区别&#xff1a;4.SOC5. SoPC 1. CPU CPU&#xff0c;即中…

Spring之Spring生态系统的演进

未来展望&#xff1a;Spring生态系统的演进 未来展望&#xff1a;Spring生态系统的演进 摘要引言词汇解释详细介绍新技术趋势与影响开发方向与展望探讨Spring在未来的发展趋势微服务与云原生响应式编程强调开发效率和全栈式开发支持人工智能和大数据保持灵活性和创新性 针对新兴…

docker 内apt-get安装软件都不好使

报各种错误 apt-get install --no-install-recommends libboost-all-dev Reading package lists... Done Building dependency tree Reading state information... Done The following additional packages will be installed:autoconf automake autotools-dev cpp-8 gc…

索引设计规范

索引是帮助数据库高效获取数据的数据结构。索引是加速查询的常用技术手段。在设计索引时&#xff0c;要遵循索引设计规范&#xff0c;避免不必要的踩坑。 【推荐】索引存储结构推荐BTREE InnoDB和MyISAM存储引擎表&#xff0c;索引类型必须为BTRER&#xff0c;MEMORY表可以根…

vue2 vue中的常用指令

一、为什么要学习Vue 1.前端必备技能 2.岗位多&#xff0c;绝大互联网公司都在使用Vue 3.提高开发效率 4.高薪必备技能&#xff08;Vue2Vue3&#xff09; 二、什么是Vue 概念&#xff1a;Vue (读音 /vjuː/&#xff0c;类似于 view) 是一套 **构建用户界面 ** 的 渐进式 …

go:正确引入自己编写的包(如何在 Go 中正确引入自己编写的包)

前言 目录如下&#xff1a; 具体教程 1. 工作空间&#xff08;我的是根目录&#xff09;新建 go.work 文件 文件内容如下&#xff1a; go 1.21.0use (./tuchuang./tuchuang/testm ) 2. 添加go.mod文件 1. 包文件夹下 进入testm目录执行 go mod init testModule 2. 引用目…

设计模式——装饰器模式

装饰器模式 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其结构。这种类型的设计模式属于结构型模式&#xff0c;它是作为现有的类的一个包装。 装饰器模式通过将对象包装在装饰器类中&#xff0c;以便动态…

软考高级系统架构设计师系列论文九十九:论软件开发平台的选择和应用

软考高级系统架构设计师系列论文九十九:论软件开发平台的选择和应用 一、相关知识点二、摘要三、正文四、总结一、相关知识点 软考高级系统架构设计师系列之:面向构件的软件设计,构件平台与典型架构二、摘要 本文从一个行业MIS系统的开发实践,讨论了软件开发平台的选择和应…

java maven项目打jar包发布(精简版)

目录 一、maven打包 二、安装jdk环境 三、安装mysql 四、jar包传输到服务器 一、maven打包 先clean再package target文件夹下面有生成一个jar包 二、安装jdk环境 1、下载jdk cd /usr/local wget https://repo.huaweicloud.com/java/jdk/8u201-b09/jdk-8u201-linux-x64.tar.…

数据库——事务,事务隔离级别

文章目录 什么是事务?事务的特性(ACID)并发事务带来的问题事务隔离级别实际情况演示脏读(读未提交)避免脏读(读已提交)不可重复读可重复读防止幻读(可串行化) 什么是事务? 事务是逻辑上的一组操作&#xff0c;要么都执行&#xff0c;要么都不执行。 事务最经典也经常被拿出…

sonarqube报错http status 500-internal server error,什么原因,怎么解决

sonarqube报错http status 500-internal server error,什么原因&#xff0c;怎么解决 答案&#xff1a; SonarQube报错HTTP状态500-内部服务器错误通常是由于服务器端出现了一些问题导致的。这可能是由于配置错误、资源不足、数据库连接问题或其他一些未知的问题引起的。 以下…