这里我想完成的是
制作消息(多个协程制造)——》推送到rabbitmq——》订阅消息队列——》消费消息(ws协程客户端【一次消费多条】/ws前端)
利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。
一,安装rabbitmq
- pull镜像
docker search rabbitmq
docker pull rabbitmq
- 构建rabbitmq容器并加入hyperf环境组网
这里我们构建容器的的就将他加入到了hyperf环境的网络lnmp_default(使用docker inspect hyperf 查看)
docker run -d --network=lnmp_default --hostname my-rabbit --name rabbit -p 15672:15672 -p 5673:5672 rabbitmq
-d 后台运行容器;--name 指定容器名;-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);-v 映射目录或文件;--network 网络名称--hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);-e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
- 进入容器并运行
docker exec -it rabbit /bin/bash
rabbitmq-plugins enable rabbitmq_management
这里须要修改配置文件,不然有的地方会报错 docker rabbitmq Management API returned status code 500
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
用户名和密码 guest
二,rabbitmq配置
生成配置文件 php bin/hyperf.php vendor:publish hyperf/amqp
并修改配置文件,这里的AMQP_HOST 就是lnmp_default 网络组中 rabbitmq的ip地址,可使用docker inspect rabbit
查看,端口即为容器内部映射ip这里默认是5672
三,测试代码
php bin/hyperf.php gen:amqp-producer DemoProducer
- 生产者
<?phpdeclare(strict_types=1);namespace App\Amqp\Producer;use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
use function Hyperf\Coroutine\co;//routingKey是指定路由到哪里,可以决定到哪个队列。队列通过路由键绑定到交换器
#[Producer(exchange: 'hyperf', routingKey: 'hyperf-queue.sj')]
//生产者方指定了exchange(交换机)和routing key,但是不指定queue(队列)也不将queue(队列)绑定到exchange,
//队列声明和绑定队列到exchange的工作由消费者方完成
//费者在消费消息时,需要声明队列(队列名字随便),并将声明的队列通过routing key绑定到exchange,这样才能接收到数据
class DemoProducer extends ProducerMessage
{public function __construct($id){$pdata['id']=$id;$pdata['time']=date("Y-m-d H:i:s",time());$pdata['name']=array_rand(['张三','李四','王虎','陆风','牛犇','冯晨','丁酉','郑和']);;$pdata['say']=bin2hex(random_bytes(10));$pdata['score']=array_rand(['61','80','75','77','99','88','97','81']);$this->payload = $pdata;}
}
- 消费者
<?phpdeclare(strict_types=1);namespace App\Amqp\Consumer;use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;#[Consumer(exchange: 'hyperf', routingKey: 'hyperf-queue.sj', queue: 'hyperf', name: "DemoConsumer", nums: 1)]
class DemoConsumer extends ConsumerMessage
{public function consumeMessage($data, AMQPMessage $message): Result{print_r($data);return Result::ACK;}
}
- 控制器
public function productMessage(){try {//这里使用协程创建100条消息$wg = new \Hyperf\Coroutine\WaitGroup();$wg->add(100);for($i=0;$i<100;$i++){//创建协程co(function () use ($wg){$message=new DemoProducer(1);//获取一个生产者实例$producer = ApplicationContext::getContainer()->get(Producer::class);//传递消息$producer->produce($message);$wg->done();});}$wg->wait();} catch (\Exception $exception) {throw new \Swoole\Exception($exception->getMessage());}return ['info' => "100条消息创建成功",];}
保存后重启服务 php hyperf.php start
,定义的消费者会自动消费,详见文档。
ps:这里有个小问题时间不对。这里实际上在composer hyperf的时候第一步就是可以设置时区Asia/Shanghai
都是我没设置,现在可以偷个懒在入口文件添加 date_default_timezone_set(‘Asia/Shanghai’); 即可。(或修改php.ini)