rabbitMq交换机,队列情况,先手动创建
1. 创建普通交换机exchange,普通队列order_queue_expire,队列设置属性:
消息过期时间:60000毫秒,过期绑定dead_exchange交换机,routing_key:dead_queue2. 创建deal_exchange交换机,dead_queue,通过路由key:dead_queue 绑定
tp6.0为例
生产者
public function producer(){// 设置队列的TTL和死信交换机$deadExchangeName = 'dead_exchange';$deadQueueName = 'dead_queue';$messageTtl = 1 * 60 * 1000; // 1分钟$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();//死信交换机和路由$tale = new AMQPTable();$tale->set('x-dead-letter-exchange', $deadExchangeName);$tale->set('x-dead-letter-routing-key', $deadQueueName);$tale->set('x-message-ttl', $messageTtl);// 声明普通队列$queueName = 'order_queue_expire';$channel->queue_declare($queueName, false, true, false, false, false,$tale);// 将普通队列绑定到普通交换机$channel->queue_bind($queueName, 'exchange', $queueName);$channel->exchange_declare($deadExchangeName, 'direct', false, true, false);$channel->queue_declare($deadQueueName, false, true, false, false);$channel->queue_bind($deadQueueName, $deadExchangeName, $deadQueueName);for($i = 1000; $i < 1010; $i++) {$payload = json_encode(['order_id' => $i]);$msg = new AMQPMessage($payload, array('delivery_mode' => 2));sleep(1);$channel->basic_publish($msg, '', $queueName);}// 确保在脚本结束时关闭通道和连接register_shutdown_function(function () use ($channel, $connection) {$channel->close();$connection->close();});dd('ok');}
消费者
<?php
declare (strict_types = 1);namespace app\command;use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\Output;class Order extends Command
{protected function configure(){// 指令配置$this->setName('cancelOrder')->setDescription('the cancelOrder command');}protected function execute(Input $input, Output $output){$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();$consumer = new DeadLetterConsumer($connection, $channel, 'dead_queue');$consumer->consume(function (AMQPMessage $message) {$body = $message->getBody();echo "Received message: ", $body, "\n";sleep(1);//$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);});// 当你完成消费后,关闭连接$consumer->close();}}class DeadLetterConsumer
{public function __construct($connection, $channel, $queueName){$this->connection = $connection;$this->channel = $channel;$this->queueName = $queueName;$this->channel->queue_declare($queueName, false, true, false, false);$this->channel->exchange_declare('dead_exchange', 'direct', false, true, false);$this->channel->queue_bind($queueName, 'dead_exchange', 'dead_queue');}public function consume($callback){$this->channel->basic_consume($this->queueName,'',false,true,false,false,$callback);while (count($this->channel->callbacks)) {$this->channel->wait();}}public function close(){$this->channel->close();$this->connection->close();}
}