RabbitMQ+PHP 教程六(RPC)

(using php-amqplib)

前提必读

本教程假设RabbitMQ是安装在标准端口上运行(5672)。如果您使用不同的主机、端口或凭据,则连接设置需要调整。

如果您在本教程中遇到困难,可以通过邮件列表与我们联系。

开始

在第二个教程中,我们学习了如何使用工作队列在多个工人之间分配耗时的任务。

但是如果我们需要在远程计算机上运行一个函数并等待结果呢?嗯,那是另一回事了。这种模式通常称为远程过程调用或RPC。

在本教程中我们将使用RabbitMQ搭建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分配的耗时的任务,所以我们将创建一个返回Fibonacci数的模拟一个RPC服务。

Client interface

为了说明如何使用RPC服务,我们将创建一个简单的客户类。它将公开一个名为调用的方法,该方法发送一个RPC请求并阻塞直到接收到结果为止:

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";

关于RPC的一些建议

虽然RPC是计算中非常常见的模式,但它经常遭到批评。当程序员不知道函数调用是本地的,或者它是一个缓慢的RPC时,问题就出现了。这样的混乱导致了不可预知的系统,并给调试增加了不必要的复杂性。而简化软件,滥用会导致难以维护的RPC代码。

考虑到这一点,请考虑以下建议:

确保很明显哪个函数调用是本地调用,并且它是远程的。
记录系统。使组件之间的依赖关系清晰。
处理错误案例。RPC服务器长时间处于下行状态时,客户端应如何响应?
有疑问时避免RPC。如果可以,则应该使用异步管道,而不是像阻塞这样的RPC,结果被异步推送到下一个计算阶段。

回调队列(Callback queue)

一般在RabbitMQ做RPC是容易的。客户端发送一条请求消息和一个响应消息的服务器回复。为了接收响应,我们需要向请求发送一个“回调”队列地址。我们可以使用默认队列。让我们试试看:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$msg = new AMQPMessage($payload,array('reply_to' => $queue_name));$channel->basic_publish($msg, '', 'rpc_queue');
# ... then code to read a response message from the callback_queue ...
消息属性
AMQP协议(0-9-1 protocol)预定义了一套14个属性,去一个消息。大多数属性很少使用,除了以下内容:

delivery_mode: 将消息标记为持久性。 (with a value of 2) or transient (1). 您可能会从第二个教程中记住这个属性。
content_type:用来描述编码的MIME类型。例如,对于常用的JSON编码,将此属性设置为应用程序/ JSON是一个很好的做法。
reply_to:常用的名字一个回调队列。
correlation_id:有助于将RPC响应与请求关联起来。

Correlation Id

在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有一个更好的方法——让我们为每个客户机创建一个回调队列。

这引发了一个新问题,在队列中收到了响应,不清楚响应的请求属于哪个。那时候correlation_id属性用于。我们将把它设置为每个请求的唯一值。稍后,当我们在回调队列中接收消息时,我们将查看这个属性,并在此基础上,我们将能够将响应与请求匹配。如果我们看到一个未知的correlation_id值,我们可以安全地忽略信息-它不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败出错呢?这是由于服务器端可能出现竞争情况。虽然不太可能,RPC服务器可能在发送完答案后死亡,但在发出请求的确认消息之前。如果发生这种情况,重新启动的RPC服务器将再次处理请求。这就是为什么在客户机上我们必须优雅地处理重复响应,而RPC应该理想地是幂等的。

总结

clipboard.png

我们的RPC会像这样工作:

当客户端启动时,它创建一个匿名的独占回调队列。

一个RPC请求,客户端发送消息,两个属性:reply_to,设置回调队列和correlation_id,它被设置为每个请求的唯一值。

请求被发送到一个rpc_queue队列。

RPC worker(又名:服务器)正在等待该队列上的请求。当一个请求时,它的工作和发送消息的结果返回给客户端,使用从reply_to队列。

客户机等待回调队列上的数据。当消息出现时,它检查correlation_id属性。如果它与请求的值匹配,则返回对应用程序的响应。

汇总

Fibonacci 递归源码:

function fib($n) {if ($n == 0)return 0;if ($n == 1)return 1;return fib($n-1) + fib($n-2);
}
``
我们声明fibonacci(斐波那契)函数。它只假设有效的正整数输入。(不要指望这一个能为大数字工作,而且这可能是最慢的递归实现)。我们的RPC服务器rpc_server.php代码看起来像这样:

<?php

require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

function fib($n) {

if ($n == 0)return 0;
if ($n == 1)return 1;
return fib($n-1) + fib($n-2);

}

echo " [x] Awaiting RPC requestsn";
$callback = function($req) {

$n = intval($req->body);
echo " [.] fib(", $n, ")\n";$msg = new AMQPMessage((string) fib($n),array('correlation_id' => $req->get('correlation_id')));$req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);

};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();
$connection->close();

?>


服务器代码相当简单:像往常一样,我们从建立连接、通道和声明队列开始。我们可能需要运行多个服务器进程。为了分散负载同样多的服务器需要设置`prefetch_count`, 设置`$channel.basic_qos`美元。我们用`basic_consume`访问队列。然后,我们进入while循环,在其中等待请求消息,完成工作并发送响应。我们rpc_client.php RPC客户端代码:

<?php

require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class FibonacciRpcClient {

private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;public function __construct() {$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$this->channel = $this->connection->channel();list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);$this->channel->basic_consume($this->callback_queue, '', false, false, false, false,array($this, 'on_response'));
}
public function on_response($rep) {if($rep->get('correlation_id') == $this->corr_id) {$this->response = $rep->body;}
}public function call($n) {$this->response = null;$this->corr_id = uniqid();$msg = new AMQPMessage((string) $n,array('correlation_id' => $this->corr_id,'reply_to' => $this->callback_queue));$this->channel->basic_publish($msg, '', 'rpc_queue');while(!$this->response) {$this->channel->wait();}return intval($this->response);
}

};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";

?>

现在是一个很好的时间来让我们完整的示例源代码rpc_client.php和rpc_server.php。我们的RPC服务现在准备好了。我们可以启动服务器:

php rpc_server.php
# => [x] Awaiting RPC requests

请求斐波那契数运行客户机:

php rpc_client.php
# => [x] Requesting fib(30)
``
这里介绍的设计并不是RPC服务的唯一实现,但它有一些重要的要点:

如果RPC服务器太慢,您可以通过运行另一个服务器来扩展。试着在一个新的控制台再运行第一个:rpc_server.php。

在客户端,RPC只需要发送和接收一条消息。不喜欢queue_declare需要同步调用。因此,RPC客户机只需要一次RPC请求的一次网络往返。

我们的代码仍然非常简单,并没有试图解决更复杂(但重要)的问题,例如:

如果没有服务器运行,客户端应该如何反应?

客户端应该对RPC有某种超时吗?

如果服务器发生故障并引发异常,是否应该转发给客户端?

在处理前防止无效传入消息(如检查边界、类型)。

如果您想进行实验,您可能会发现management UI对于查看队列非常有用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/284009.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TKMybatis 介绍和使用

目录 一、什么是 TKMybatis 二、TKMybatis 使用 2.1 Springboot 项目中加入依赖 2.2 使用讲解 2.2.1 实体类中使用 2.2.2 dao中使用 2.2.3 Service 层中使用 2.3 实际案例 2.3.1 dao 层使用 2.3.2 service 层使用 一、什么是 TKMybatis TKMybatis 是基于 Mybatis 框…

WinForm(三)揭开可视化控件的面纱

WinForm所见即所得的UI设计框架&#xff0c;开发效率确实有所提升&#xff0c;同时降低了编程门槛&#xff0c;让WinForm更普及。拖拖拽拽就能设计出一个界面&#xff0c;那么我们拖拽的这些东西是什么&#xff1f;它们是什么原理&#xff1f;。WinForm我觉得很好的一点是&…

RestTemplate 详解

在项目中&#xff0c;当我们需要远程调用一个 HTTP 接口时&#xff0c;我们经常会用到 RestTemplate 这个类。这个类是 Spring 框架提供的一个工具类。Spring 官网对它的介绍如下&#xff1a; RestTemplate: The original Spring REST client with a synchronous, template met…

初识Spark2.0之Spark SQL

内存计算平台Spark在今年6月份的时候正式发布了spark2.0&#xff0c;相比上一版本的spark1.6版本&#xff0c;在内存优化&#xff0c;数据组织&#xff0c;流计算等方面都做出了较大的改变&#xff0c;同时更加注重基于DataFrame数据组织的MLlib&#xff0c;更加注重机器学习整…

ABP详细教程——模块类

概述模块化是ABP vNext的最大亮点&#xff0c;也是ABP vNext框架的核心&#xff0c;而模块类是ABP vNext框架模块化的核心要素。这一章节&#xff0c;我就从模块类的用法、运行机制、源代码等层面&#xff0c;带大家详细了解ABP vNext的模块类。用法在ABP的约定中&#xff0c;每…

[转]Eureka工作原理

目录 Eureka 工作原理 Eureka 核心概念 自我保护机制 Eureka 集群原理 Eurka 工作流程 总结 Eureka 工作原理 上节内容为大家介绍了&#xff0c;注册中心 Eureka 产品的使用&#xff0c;以及如何利用 Eureka 搭建单台和集群的注册中心。这节课我们来继续学习 Eureka&…

重谈联想5G编码投票事件

此前&#xff0c;司马南谈了联想好几个问题&#xff0c;其中最尖锐的要属国有资产流失&#xff0c;这是联想管理层无法回避的死穴。不过&#xff0c;司马南批判联想5G投票背刺H公司&#xff0c;这基本就是造谣了。当年&#xff0c;媒体把编码投票炒作的很厉害&#xff0c;抨击联…

JStorm2.1.1集群的安装和使用

为什么80%的码农都做不了架构师&#xff1f;>>> JStorm2.1.1集群的安装和使用 Storm是一个免费开源、分布式、高容错的实时计算系统&#xff0c;而JStorm是阿里巴巴开源的基于Storm采用Java重写的一套分布式实时流计算框架&#xff0c;在性能和支持的集群规模上做了…

Hystrix 原理

Hystrix是什么&#xff1f; Hystrix是Netflix开源库&#xff0c;这是一个针对分布式系统的延迟和容错库。 Hystrix 供分布式系统使用&#xff0c;提供延迟和容错功能&#xff0c;隔离远程系统、访问和第三方程序库的访问点&#xff0c;防止级联失败&#xff0c;保证复杂的分布…

「深度」无人机实名制政策特稿|市场看好、资本关注,“反黑飞”正在崛起

从政策和需求来看&#xff0c;“反黑飞”越来越重要&#xff0c;市场也正在不断崛起。 对于大多数人来说&#xff0c;今天是最适合明目张胆“装嫩”的六一儿童节。不过&#xff0c;在无人机厂商和无人机玩家的眼里&#xff0c;今天是无人机实名制政策正式实施的日子。 近年来&…

在navicat中新建数据库

前言&#xff1a; 在本地新建一个名为editor的数据库&#xff1b; 过程&#xff1a; 1.&#xff1b; 2.选择&#xff1a;utf8mb4 -- UTF-8 Unicode字符集&#xff0c;原因在于&#xff1a;utf8mb4兼容utf8&#xff0c;且比utf8能表示更多的字符。&#xff0c;而且它支持表情符号…

MASA Stack 第三期社区例会

MASA Blazor 0.5.0发版内容功能Autocomplete&#xff1a;支持通过设置AutoSelectFirst参数开启自动选择第一项的功能&#xff0c;支持CacheItems参数&#xff0c;增强使用上下键的用户体验。BottomNavigation&#xff1a;&#xff1a;一个替代侧边栏的新组件。它主要用于移动应…

[转]高并发架构设计之--「服务降级」、「服务限流」与「服务熔断」

目录 服务降级 1 、简介 2 、使用场景 3 、核心设计 3.1 分布式开关 3.2 自动降级分类 3.3 配置中心 3.4 处理策略 3.5 降级分类 3.6 服务降级要考虑的问题 4 、高级特性 4.1 分级降级 4.2 降级权值 5 、总结与展望 服务限流 一、为什么要做服务限流设计&…

SpringBoot获取ApplicationContext

2019独角兽企业重金招聘Python工程师标准>>> 有两种方法&#xff1a; 创建Component实现ApplicationContextAware接口&#xff0c;SpringBoot会自动调用这个类的setApplicationConext()方法。鼓励使用这种方式。SpringApplication.run(MyApplication.class, args)这…

SkiaSharp 之 WPF 自绘 投篮小游戏(案例版)

此案例主要是针对光线投影法碰撞检测功能的示例&#xff0c;顺便做成了一个小游戏&#xff0c;很简单&#xff0c;但是&#xff0c;效果却很不错。投篮小游戏规则&#xff0c;点击投篮目标点&#xff0c;就会有一个球沿着相关抛物线&#xff0c;然后&#xff0c;判断是否进入篮…

zuul集成ribbon完成服务通信和负载均衡

目录 Zuul2服务通信 超时相关 默认超时配置 自定义超时配置 负载均衡 Zuul2服务通信 描述&#xff1a;zuul2通过Ribbon完成客户端负载均衡以及与服务器群集进行通信。 zuul2的通信是集成Ribbon实现的&#xff0c;在Origin中集成Ribbon基本配置&#xff08;例如IClientCo…

时任上海来伊份互联网事业群总裁王戈钧 :传统企业(线上+线下)移动互联网改造...

2017年12月22日-23日&#xff0c;第13届信息化领袖峰会暨2017中国数字化贡献人物颁奖盛典在上海盛大开幕。本次峰会由上海市经济和信息化委员会指导&#xff0c;上海市国有资产信息中心、上海市计算机用户协会、上海市信息服务业行业协会、上海大数据联盟、上海市高等教育学会支…

【.NET6+Modbus】Modbus TCP协议解析、仿真环境以及基于.NET实现基础通信

接下来的内容&#xff0c;我会以从头开发一个简单的基于modbus tcp通信的案例&#xff0c;来实现一个基础的通信功能。有关环境&#xff1a;开发环境&#xff1a;VS 2022企业版运行环境&#xff1a;Win 10 专业版.NET 环境版本&#xff1a;.NET 6【备注】 源码在文末 1、新建一…

源码深度剖析Eureka与Ribbon服务发现原理

本文基于 spring cloud dalston&#xff0c;同时文章较长&#xff0c;请选择舒服姿势进行阅读。 Eureka 与 Ribbon 是什么&#xff1f;和服务发现什么关系&#xff1f; Eureka 与 Ribbon 都是 Netflix 提供的微服务组件&#xff0c;分别用于服务注册与发现、负载均衡。同时&a…

std的find和reverse_iterator联合使用

上代码&#xff1a; // test2013.cpp : 定义控制台应用程序的入口点。 //#include "stdafx.h" #include <stdlib.h> #include <stdio.h> #include<iostream> #include<vector> #include<map> #include<string> using namespace …