tp6 RabbitMQ

1、composer 安装 AMQP 扩展
composer require php-amqplib/php-amqplib
2、RabbitMQ 配置

 在 config 目录下创建 rabbitmq.php 文件

<?php
return ['host'=>'','port'=>'5672','user'=>'','password'=>'','vhost'=>'','exchange_name' => '','queue_name' => '','route_key' => '','consumer_tag' => '',
];
3、生产者代码

app目录下创建Producer.php

<?phpnamespace app;use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;class Producer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],'itcast');//创建通道$this->channel = $this->connection->channel();}public function send($data){/*** 创建队列(Queue)* name: hello         // 队列名称* passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: true       // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失;设置true,则代表是一个持久化的队列,服务重启后也会存在,因为服务会把持久化的queue存放到磁盘上当服务重启的时候,会重新加载之前被持久化的queue* exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除* auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除**/$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);/*** 创建交换机(Exchange)* name: vckai_exchange// 交换机名称* type: direct        // 交换机类型,分别为direct/fanout/topic,参考另外文章的Exchange Type说明。* passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建* durable: false      // 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失* auto_delete: false  // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除*/$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);// 绑定消息交换机和队列$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'],$this->mq_config['route_key']);$messageBody = json_encode($data);//将要发送数据变为json字符串/*** 创建AMQP消息类型* delivery_mode 消息是否持久化* AMQPMessage::DELIVERY_MODE_NON_PERSISTENT  不持久化* AMQPMessage::DELIVERY_MODE_PERSISTENT      持久化*/$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));/*** 发送消息* msg: $message            // AMQP消息内容* exchange: vckai_exchange // 交换机名称* routing_key: hello       // 路由key*/$this->channel->basic_publish($message, $this->mq_config['exchange_name'], $this->mq_config['route_key']);//关闭连接$this->stop();}//关闭进程public function stop(){$this->channel->close();$this->connection->close();}}
4、消费者代码

app目录下创建Consumer.php

<?phpnamespace app;use app\index\controller\ApiCommunity;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use think\db\exception\PDOException;
use think\facade\Log;class Consumer
{private $connection;private $channel;private $mq_config;public function __construct(){$this->mq_config = config('rabbit_mq');$this->connection = new AMQPStreamConnection($this->mq_config['host'],$this->mq_config['port'],$this->mq_config['user'],$this->mq_config['password'],$this->mq_config['vhost']);//创建通道$this->channel = $this->connection->channel();}/*** @param $channel* @param $connection* 关闭进程*/function shutdown($channel, $connection){$channel->close();$connection->close();}/*** @param $message* 消息处理*/function process_message($message){//消息处理逻辑echo $message->body . "\n";if ($message->body !== 'quit') {$obj = json_decode($message->body);if (!isset($obj->id)) {Log::write("error data:" . $message->body, 2);} else {try {Log::write("data:" . json_encode($message));//消息处理} catch (\Think\Exception  $e) {Log::write($e->getMessage(), 2);Log::write(json_encode($message), 2);} catch (PDOException $pe) {Log::write($pe->getMessage(), 2);Log::write(json_encode($message), 2);}}}// 手动确认ack,确保消息已经处理$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);// Send a message with the string "quit" to cancel the consumer.if ($message->body === 'quit') {$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);}}/*** @throws \ErrorException* 启动** nohup php index.php index/Message_Consume/start &*/public function start(){// 设置消费者(Consumer)客户端同时只处理一条队列// 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。//消费者端要把自动确认autoAck设置为false,basic_qos才有效。//$this->channel->basic_qos(0, 1, false);// 同样是创建路由和队列,以及绑定路由队列,注意要跟producer(生产者)的一致// 这里其实可以不用设置,但是为了防止队列没有被创建所以做的容错处理$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'], $this->mq_config['route_key']);/**** queue: queue_name    // 被消费的队列名称* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端* no_local: false      // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现* no_ack: true         // 收到消息后,是否不需要回复确认即被认为被消费* exclusive: false     // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下* nowait: false        // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错* callback: $callback  // 回调逻辑处理函数**/$this->channel->basic_consume($this->mq_config['queue_name'], $this->mq_config['consumer_tag'], false, false, false, false, array($this, 'process_message'));register_shutdown_function(array($this, 'shutdown'), $this->channel, $this->connection);while (count($this->channel->callbacks)) {$this->channel->wait();}}
}
5、创建自定义命令
php think make:command Consumer

在项目跟目录执行以下命令,会自动生成 在 command 目录生成 Consumer 控制器 

<?php
declare (strict_types = 1);namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class Consumer extends Command
{protected function configure(){// 指令配置$this->setName('consumer')->setDescription('the consumer command');}protected function execute(Input $input, Output $output){// 指令输出$output->writeln('consumer');$consumer = new \app\Consumer();
//        $consumer->process_message(11)$consumer->start();}
}

config/console.php 代码增加如下:

// 指令定义
'commands' => ['consumer' => 'app\command\Consumer',
],
6、命令

消费者命令

php think consumer

 生产者执行命令

$producer = new Producer();
$data = ['message' => "发送的消息内容"
];
$producer->send($data);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/38017.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中国生产了5.07亿台,库存高达近4亿台?国产手机彻底卖不动了?

统计数据显示今年上半年中国的手机产量达到5.07亿台&#xff0c;国内市场手机出货量仅有1.24亿台&#xff0c;都出现了下滑&#xff0c;那么中国手机的产量比销量多出了3.83亿台&#xff0c;这些手机都成为了库存&#xff1f; 中国手机市场确实不如早年那么辉煌&#xff0c;201…

【FAQ】安防监控视频EasyCVR平台分发的FLV视频流在VLC中无法播放

众所周知&#xff0c;TSINGSEE青犀视频汇聚平台EasyCVR可支持多协议方式接入&#xff0c;包括主流标准协议国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。在视频流的处理与分发上&#xff0c;视频监控…

P12-Retentive NetWork-RetNet挑战Transformer

论文地址:https://arxiv.org/abs/2307.08621 目录 Abstract 一.Introduction 二.Retentive Networks 2.1Retention 2.2Gated Multi-Scale Retention 2.3Overall Architecture of Retention Networks 2.4Relation to and Differences from Previous Methods 三.Experime…

Codeforces Round 892 (Div. 2)(VP)

A //b里放最小值&#xff0c;其他值放c。如果最大值最小值&#xff0c;则无解。 void solve() {int n; cin >> n;vi a(n); liter(x, a) cin >> x; sort(all(a));if (a[0] a[n - 1]){print(-1); return;}vi b, c;for (int i 0; i < sz(a); i){if (a[i] a[0])…

小米基于 Flink 的实时计算资源治理实践

摘要&#xff1a;本文整理自小米高级软件工程师张蛟&#xff0c;在 Flink Forward Asia 2022 生产实践专场的分享。本篇内容主要分为四个部分&#xff1a; 发展现状与规模框架层治理实践平台层治理实践未来规划与展望 点击查看原文视频 & 演讲PPT 一、发展现状与规模 如上图…

【03】基础知识:typescript中的函数

一、typescript 中定义函数的方法 函数声明法 function test1(): string {return 返回类型为string }function test2(): void {console.log(没有返回值的方法) }函数表达式/匿名函数 const test3 function(): number {return 1 }二、typescript 中 函数参数写法 1、typesc…

helm安装harbor + nerdctl 制作push 镜像

参考 文章&#xff1a;Helm部署Harbor_helm harbor_风向决定发型丶的博客-CSDN博客 安装好后使用 nerd containerd对接harbor_containerd 容器 insecure-registries 配置_柠是柠檬的檬的博客-CSDN博客 推送镜像 Containerd 对接私有镜像仓库 Harbor - 知乎 接下来我们来…

麒麟系统相关

创建虚拟机 镜像下载地址 选择合适的镜像&#xff0c;进入引导后注意不要选择默认的第一条&#xff0c;选择第二条进入安装程序。 root密码修改 使用命令 sudo passwd root 开启ssh 配置好网络后发现能ping通&#xff0c;但无法ssh连接&#xff0c;ps -ef | grep ssh 得…

01 qt快速入门

一 qt介绍 1.基本概念 1991年由Qt Company(奇趣)开发的跨平台C++图形用户界面应用程序开发框架,GUI程序和非GUI程序。优点:一套源码在不同的平台通过不同的编译器进行编译,就可以运行到该平台上目标机。面向对象的封装机制来对其接口封装。 GUI —图形用户界面(Graphic…

软件测试面试题【2023整理版(含答案)】

01、您所熟悉的测试用例设计方法都有哪些&#xff1f;请分别以具体的例子来说明这些方法在测试用例设计工作中的应用。 答&#xff1a;有黑盒和白盒两种测试种类&#xff0c;黑盒有等价类划分方法 边界值分析方法 错误推测方法 因果图方法 判定表驱动分析方法 正交实验设…

Vue组件之间的传值汇总

组件之间的传值 1、父传子 props 2、父传子 slot 3、父传子 不建议用 attrs 4、 子传父 ref 5、子传父 emit 6、povide/inject只能在setup的时候用。 7、利用vuex和pinia去实现数据的交互 1、实现代码App.vue <script setup>import TestProps from ./components/T…

stable-diffusion 模型效果+prompt

摘自个人印象笔记&#xff0c;图不完整可查看原笔记&#xff1a;https://app.yinxiang.com/fx/55cda0c6-2af5-4d66-bd86-85da79c5574ePrompt运用规则及技巧 &#xff1a; 1. https://publicprompts.art/&#xff08;最适用于OpenArt 线上模型 https://openart.ai/&#xff09;…

【Vue-Router】别名

后台返回来的路径名不合理&#xff0c;但多个项目在使用中了&#xff0c;不方便改时可以使用别名。可以有多个或一个。 First.vue <template><h1>First Seciton</h1> </template>Second.vue&#xff0c;Third.vue代码同理 UserSettings.vue <tem…

R语言生存分析(机器学习)(2)——Enet(弹性网络)

弹性网络&#xff08;Elastic Net&#xff09;:是一种用于回归分析的统计方法&#xff0c;它是岭回归&#xff08;Ridge Regression&#xff09;和lasso回归&#xff08;Lasso Regression&#xff09;的结合&#xff0c;旨在克服它们各自的一些限制。弹性网络能够同时考虑L1正则…

mysql 索引 区分字符大小写

mysql 建立索引&#xff0c;特别是unique索引&#xff0c;是跟字符集、字符排序规则有关的。 对于utf8mb4_0900_ai_ci来说&#xff0c;0900代表Unicode 9.0的规范&#xff0c;ai表示accent insensitivity&#xff0c;也就是“不区分音调”&#xff0c;而ci表示case insensitiv…

wsl2安装docker引擎(Install Docker Engine on Debian)

安装 1.卸载旧版本 在安装 Docker 引擎之前&#xff0c;您必须首先确保卸载任何冲突的软件包。 发行版维护者在他们的存储库。必须先卸载这些软件包&#xff0c;然后才能安装 Docker 引擎的正式版本。 要卸载的非官方软件包是&#xff1a; docker.iodocker-composedocker-…

问道管理:旅游酒店板块逆市拉升,桂林旅游、华天酒店涨停

游览酒店板块14日盘中逆市拉升&#xff0c;到发稿&#xff0c;桂林游览、华天酒店涨停&#xff0c;张家界涨超8%&#xff0c;君亭酒店涨超5%&#xff0c;众信游览、云南游览涨逾4%。 音讯面上&#xff0c;8月10日&#xff0c;文旅部办公厅发布康复出境团队游览第三批名单&#…

Profibus-DP转modbus RTU网关modbus rtu和tcp的区别

捷米JM-DPM-RTU网关在Profibus总线侧实现主站功能&#xff0c;在Modbus串口侧实现从站功能。可将ProfibusDP协议的设备&#xff08;如&#xff1a;EH流量计、倍福编码器等&#xff09;接入到Modbus网络中&#xff1b;通过增加DP/PA耦合器&#xff0c;也可将Profibus PA从站接入…

【计算机网络】Udp详解

前言 上几文章我们讲解了应用层协议Http和Https&#xff0c;要知道应用层协议有很多&#xff0c;这些都是程序员自己定制的&#xff0c;而真正要传输的时候&#xff0c;是要在操作系统的传输层进行的&#xff0c;今天我们就来学习一下传输层协议Udp的 标识一个通信 要进行跨…

MySQL 深度分页优化

MySQL 深度分页优化 理解总结&#xff1a; 分页使用limit &#xff0c;前提是要排序好的数据&#xff0c;这时候&#xff0c;就推荐使用带索引的字段排序&#xff0c;因为索引是天然有序的&#xff0c;不需要像是无序的字段一样&#xff0c;全表扫描&#xff0c;如果太大的话…