<?php
/*** 生产者*/$connection = new AMQPConnection(['host' => '192.168.23.130','port' => 5672,'login' => 'rabuser','password' => '123456'
]);$connection->connect() or die('连接失败');try{$exchange_name = 'trades';$route_key = '/trade';//投递消息到中间件$channel = new AMQPChannel($connection);//创建消息通道$exchange = new AMQPExchange($channel);//通过通道连接交换几//设置通道名称$exchange->setName($exchange_name);$data = json_encode(['time'=>time()]);//发布消息到交换机中$exchange->publish($data,$route_key);}catch (AMQPChannelException $e){var_dump($e);
}
<?php
/*** 消费者*/$connection = new AMQPConnection(['host' => '192.168.23.130','port' => 5672,'login' => 'rabuser','password' => '123456'
]);$connection->connect() or die('连接失败');try{$exchange_name = 'trades';$route_key = '/trade';$queue_name = 'queue';//投递消息到中间件$channel = new AMQPChannel($connection);//创建消息通道$exchange = new AMQPExchange($channel);//通过通道连接交换几//设置通道名称$exchange->setName($exchange_name);//三种获取消息的模式,直连模式,主题模式,广播模式$exchange->setType(AMQP_EX_TYPE_DIRECT);//声明$exchange->declareExchange();//声明队列绑定交换机路由$queue = new AMQPQueue($channel);$queue->setName($queue_name);$queue->declareQueue();//绑定监听获取数据$queue->bind($exchange_name,$route_key);//消费数据,默认阻塞监听获取数据$queue->consume(function ($event,$queue){//获取数据$msg = $event->getBody();var_dump($msg);var_dump($queue);//回应ACK$queue->ack($event->getDeliveryTag());});}catch (AMQPChannelException $e){var_dump($e);
}