一:拓展安装
composer require enqueue/amqp-lib
文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md
二:方法介绍
1:连接rabbitmq
$factory = new AmqpConnectionFactory(['host' => '192.168.6.88',//host'port' => '5672',//端口'vhost' => '/',//虚拟主机'user' => 'admin',//账号'pass' => 'admin',//密码
]);
$context = $factory->createContext();
2:声明主题
//声明并创建主题
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);//删除主题
$context->deleteTopic($fooTopic);
3:声明队列
//声明并创建队列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);//删除队列
$context->deleteQueue($fooQueue);
4:将队列绑定到主题
$context->bind(new AmqpBind($fooTopic, $fooQueue));
5:发送消息
//向队列发送消息
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);//向队列发送优先消息
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue(queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
//设置队列的最大优先级
$fooQueue->setArguments(['x-max-priority' => 10]);
$context->declareQueue($fooQueue);$message = $context->createMessage('Hello world!');$context->createProducer()->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者->send($fooQueue, $message);//向队列发送延时消息
$message = $context->createMessage('Hello world!');$context->createProducer()->setDelayStrategy(new RabbitMqDlxDelayStrategy())->setDeliveryDelay(5000) //消息延时5秒->send($fooQueue, $message);
6:消费消息【接收消息】
//消费消息
$consumer = $context->createConsumer($fooQueue);$message = $consumer->receive();// process a message
//业务代码$consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务//订阅消费者
$fooConsumer = $context->createConsumer($fooQueue);$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {// process message//业务代码$consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务return true;
});
$subscriptionConsumer->consume();//清除队列消息
$queueName = 'rabbitmq';
$queue = $context->createQueue($queueName);
$context->purgeQueue($queue);
三:简单实现
1:发送消息
//连接rabbitmq
$factory = new AmqpConnectionFactory(['host' => '192.168.6.88','port' => '5672','vhost' => '/','user' => 'admin','pass' => 'admin','persisted' => false,
]);$context = $factory->createContext();
//声明主题
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);//声明队列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);//将队列绑定到主题
$context->bind(new AmqpBind($fooTopic, $fooQueue));//发送消息到队列
$message = $context->createMessage('Hello world!');$context->createProducer()->send($fooQueue, $message);
2:消费消息
$factory = new AmqpConnectionFactory(['host' => '192.168.6.88','port' => '5672','vhost' => '/','user' => 'admin','pass' => 'admin','persisted' => false,
]);
$context = $factory->createContext();$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);$fooConsumer = $context->createConsumer($fooQueue);$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {// process message//业务代码$consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务return true;
});
$subscriptionConsumer->consume();