用最少的代码打造一个Mini版的gRPC框架

在《用最少的代码模拟gRPC四种消息交换模式》中,我使用很简单的代码模拟了gRPC四种消息交换模式(Unary、Client Streaming、Server Streaming和Duplex Streaming),现在我们更近一步,试着使用极简的方式打造一个gRPC框架(https://github.com/jiangjinnan/grpc-mini)。这个gRPC是对ASP.NET Core gRPC实现原理的模拟,并不是想重新造一个轮子。

一、“标准”的gRPC定义、承载和调用
二、将gRPC方法抽象成委托
三、将委托转换成RequestDelegate
   UnaryCallHandler
   ClientStreamingCallHandler
   ServerStreamingCallHandler
   DuplexStreamingCallHandler
四、路由注册
五、为gRPC服务定义一个接口
六、重新定义和承载服务

一、“标准”的gRPC定义、承载和调用

可能有些读者朋友们对ASP.NET Core gRPC还不是太熟悉,所以我们先来演示一下如何在一个ASP.NET Core应用中如何定义和承载一个简单的gRPC服务,并使用自动生成的客户端代码进行调用。我们新建一个空的解决方案,并在其中添加如下所示的三个项目。

44046a3ffb53a9ab8457e1d9ea9ead4f.png

我们在类库项目Proto中定义了如下所示Greeter服务,并利用其中定义的四个操作分别模拟四种消息交换模式。HelloRequest 和HelloReply 是它们涉及的两个ProtoBuf消息。

syntax = "proto3";
import "google/protobuf/empty.proto";service Greeter {rpc SayHelloUnary (HelloRequest) returns ( HelloReply);rpc SayHelloServerStreaming (google.protobuf.Empty) returns (stream HelloReply);rpc SayHelloClientStreaming (stream HelloRequest) returns (HelloReply);rpc SayHelloDuplexStreaming (stream HelloRequest) returns (stream HelloReply);
}message HelloRequest {string name = 1;
}message HelloReply {string message = 1;
}

ASP.NET Core项目中定义了如下的GreeterServce服务实现了定义的四个操作,基类GreeterBase是针对上面这个.proto文件生成的类型。

public class GreeterService: GreeterBase
{public override Task<HelloReply> SayHelloUnary(HelloRequest request, ServerCallContext context)=> Task.FromResult(new HelloReply { Message = $"Hello, {request.Name}" });public override async Task<HelloReply> SayHelloClientStreaming(IAsyncStreamReader<HelloRequest> reader, ServerCallContext context){var list = new List<string>();while (await reader.MoveNext(CancellationToken.None)){list.Add(reader.Current.Name);}return new HelloReply { Message = $"Hello, {string.Join(",", list)}" };}public  override async Task SayHelloServerStreaming(Empty request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context){await responseStream.WriteAsync(new HelloReply { Message = "Hello, Foo!" });await Task.Delay(1000);await responseStream.WriteAsync(new HelloReply { Message = "Hello, Bar!" });await Task.Delay(1000);await responseStream.WriteAsync(new HelloReply { Message = "Hello, Baz!" });}public override async Task SayHelloDuplexStreaming(IAsyncStreamReader<HelloRequest> reader, IServerStreamWriter<HelloReply> writer, ServerCallContext context){while (await reader.MoveNext()){await writer.WriteAsync(new HelloReply { Message = $"Hello {reader.Current.Name}" });}}
}

具体的服务承载代码如下。我们采用Minimal API的形式,通过调用IServiceCollection接口的AddGrpc扩展方法注册相关服务,并调用MapGrpcService<TService>将定义在GreeterServce中的四个方法映射我对应的路由终结点。

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.WebHost.ConfigureKestrel(kestrel => kestrel.ConfigureEndpointDefaults(options => options.Protocols = HttpProtocols.Http2));
var app = builder.Build();
app.MapGrpcService<GreeterService>();
app.Run();

在控制台项目Client中,我们利用生成出来的客户端类型GreeterClient分别一对应的服务交换模式调用了四个gRPC方法。

var channel = GrpcChannel.ForAddress("http://localhost:5000");
var client = new GreeterClient(channel);Console.WriteLine("Unary");
await UnaryCallAsync();Console.WriteLine("\nServer Streaming");
await ServerStreamingCallAsync();Console.WriteLine("\nClient Streaming");
await ClientStreamingCallAsync();Console.WriteLine("\nDuplex Streaming");
await DuplexStreamingCallAsync();
Console.ReadLine();async Task UnaryCallAsync()
{var request = new HelloRequest { Name = "foobar" };var reply = await client.SayHelloUnaryAsync(request);Console.WriteLine(reply.Message);
}
async Task ServerStreamingCallAsync()
{var streamingCall = client.SayHelloServerStreaming(new Empty());var reader = streamingCall.ResponseStream;while (await reader.MoveNext(CancellationToken.None)){Console.WriteLine(reader.Current.Message);}
}
async Task ClientStreamingCallAsync()
{var streamingCall = client.SayHelloClientStreaming();var writer = streamingCall.RequestStream;await writer.WriteAsync(new HelloRequest { Name = "Foo" });await Task.Delay(1000);await writer.WriteAsync(new HelloRequest { Name = "Bar" });await Task.Delay(1000);await writer.WriteAsync(new HelloRequest { Name = "Baz" });await writer.CompleteAsync();var reply = await streamingCall.ResponseAsync;Console.WriteLine(reply.Message);
}
async Task DuplexStreamingCallAsync()
{var streamingCall = client.SayHelloDuplexStreaming();var writer = streamingCall.RequestStream;var reader = streamingCall.ResponseStream;_ = Task.Run(async () =>{await writer.WriteAsync(new HelloRequest { Name = "Foo" });await Task.Delay(1000);await writer.WriteAsync(new HelloRequest { Name = "Bar" });await Task.Delay(1000);await writer.WriteAsync(new HelloRequest { Name = "Baz" });await writer.CompleteAsync();});await foreach (var reply in reader.ReadAllAsync()){Console.WriteLine(reply.Message);}
}

如下所示的是客户端控制台上的输出结果。

5a5b14dca1d7068ee5139c9389c9ea8f.png

二、将gRPC方法抽象成委托

通过上面的演示我们也知道,承载的gRPC类型最终会将其实现的方法注册成路由终结点,这一点其实和MVC是一样的。但是gRPC的方法和定义在Controller类型中的Action方法不同之处在于,前者的签名其实是固定的。如果我们将请求和响应消息类型使用Request和Reply来表示,四种消息交换模式的方法签名就可以写成如下的形式。

Task<Reply> Unary(Request request, ServerCallContext context);
Task<Reply> ClientStreaming(IAsyncStreamReader<Request> reader, ServerCallContext context);
Task ServerStreaming(Empty request, IServerStreamWriter<Reply> responseStream, ServerCallContext context);
Task DuplexStreaming(IAsyncStreamReader<Request> reader, IServerStreamWriter<Reply> writer, ServerCallContext context);

“流式”方法中用来读取请求和写入响应的IAsyncStreamReader<T>和IServerStreamWriter<T>定义如下。

public interface IAsyncStreamReader<out T>
{T Current { get; }Task<bool> MoveNext(CancellationToken cancellationToken = default);
}
public interface IAsyncStreamWriter<in T>
{Task WriteAsync(T message, CancellationToken cancellationToken = default);
}
public interface IServerStreamWriter<in T> : IAsyncStreamWriter<T>
{
}
public interface IClientStreamWriter<in T> : IAsyncStreamWriter<T>
{Task CompleteAsync();
}

表示服务端调用上下文的ServerCallContext 类型具有丰富的成员,但是它的本质就是对HttpContext上下文的封装,所以我们对它进行了简化。如下面的代码片段所示,我们给予这个上下文类型两个属性成员,一个是表示请求上下文的HttpContext,另一个则是用来设置响应状态StatusCode,后者对应的枚举定义了完整的gRPC状态码。

public class ServerCallContext
{public StatusCode StatusCode { get; set; } = StatusCode.OK;public HttpContext HttpContext { get; }public ServerCallContext(HttpContext httpContext)=> HttpContext = httpContext;
}public enum StatusCode
{OK = 0,Cancelled = 1,Unknown = 2,InvalidArgument = 3,DeadlineExceeded = 4,NotFound = 5,AlreadyExists = 6,PermissionDenied = 7,Unauthenticated = 0x10,ResourceExhausted = 8,FailedPrecondition = 9,Aborted = 10,OutOfRange = 11,Unimplemented = 12,Internal = 13,Unavailable = 14,DataLoss = 0xF
}

既然方法签名固定,意味着我们可以将四种gRPC方法定义成如下四个对应的委托,泛型参数TService、TRequest和TResponse分别表示服务、请求和响应类型。

public delegate Task<TResponse> UnaryMethod<TService, TRequest, TResponse>(TService service, TRequest request, ServerCallContext context)where TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;public delegate Task<TResponse> ClientStreamingMethod<TService, TRequest, TResponse>(TService service, IAsyncStreamReader<TRequest> reader, ServerCallContext context)where TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;public delegate Task ServerStreamingMethod<TService, TRequest, TResponse>(TService service, TRequest request, IServerStreamWriter<TResponse> writer, ServerCallContext context)where TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;public delegate Task DuplexStreamingMethod<TService, TRequest, TResponse>(TService service, IAsyncStreamReader<TRequest> reader, IServerStreamWriter<TResponse> writer, ServerCallContext context)where TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;

我们知道路由的本质就是创建一组路由模式(Pattern)和对应处理器之间的映射关系。路由模式很简单,对应的路由模板为“{ServiceName}/{MethodName}”,并且采用Post请求方法。对应的处理器最终体现为一个RequestDelegate。那么只要我们能够将上述四种委托类型都转换成RequestDelegate委托,一切都迎刃而解了。

三、将委托转换成RequestDelegate

为了将四种委托类型转化成RequestDelegate,我们将后者实现为一个ServiceCallHandler类型,并为其定义了如下两个基类。ServerCallHandlerBase的HandleCallAsync方法正好与RequestDelegate委托的签名一致,所以这个方法最终会用来处理gRPC请求。不同的消息交换模式采用不同的请求处理方式,只需实现抽象方法HandleCallAsyncCore就可以了。HandleCallAsync方法在调用此抽象方法之前将响应的ContentType设置成gRPC标准的响应类型“application/grpc”。在此之后将状态码设置为“grpc-status”首部,它将在HTTP2的DATA帧发送完毕后,以HEADERS帧发送到客户端。这两项操作都是gRPC协议的一部分。

public abstract class ServerCallHandlerBase
{public async Task HandleCallAsync(HttpContext httpContext){try{var serverCallContext = new ServerCallContext(httpContext);var response = httpContext.Response;response.ContentType = "application/grpc";await HandleCallAsyncCore(serverCallContext);SetStatus(serverCallContext.StatusCode);}catch{SetStatus(StatusCode.Unknown);}void SetStatus(StatusCode statusCode){httpContext.Response.AppendTrailer("grpc-status", ((int)statusCode).ToString());}}protected abstract Task HandleCallAsyncCore(ServerCallContext serverCallContext);
}public abstract class ServerCallHandler<TService, TRequest, TResponse> : ServerCallHandlerBasewhere TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>
{protected ServerCallHandler(MessageParser<TRequest> requestParser)=> RequestParser = requestParser;public MessageParser<TRequest> RequestParser { get; }
}

ServerCallHandler<TService, TRequest, TResponse>派生自ServerCallHandlerBase,并利用三个泛型参数TService、TRequest、TResponse来表示服务、请求和响应类型,RequestParser用来提供发序列化请求消息的MessageParser<TRequest>对象。针对四种消息交换模式的ServiceCallHandler类型均继承这个泛型基类。

UnaryCallHandler

基于Unary消息交换模式的ServerCallHandler的具体类型为UnaryCallHandler<TService, TRequest, TResponse>,它由上述的UnaryMethod<TService, TRequest, TResponse>委托构建而成。在重写的HandleCallAsyncCore方法中,我们利用HttpContext提供的IServiceProvider对象将服务实例创建出来后,从请求主体中将请求消息读取出来,然后交给指定的委托对象进行处理并得到响应消息,该响应消息最终用来对当前请求予以回复。

internal class UnaryCallHandler<TService, TRequest, TResponse> : ServerCallHandler<TService, TRequest, TResponse>where TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>
{private readonly UnaryMethod<TService, TRequest, TResponse> _handler;public UnaryCallHandler(UnaryMethod<TService, TRequest, TResponse> handler, MessageParser<TRequest> requestParser):base(requestParser)=> _handler = handler;protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext){using var scope = serverCallContext.HttpContext.RequestServices.CreateScope();var service = ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);var httpContext = serverCallContext.HttpContext;var request = await httpContext.Request.BodyReader.ReadSingleMessageAsync<TRequest>(RequestParser);var reply = await _handler(service, request!, serverCallContext);await httpContext.Response.BodyWriter.WriteMessageAsync(reply);}
}

请求消息是通过如下这个ReadSingleMessageAsync<TMessage>方法读取出来的。按照gRPC协议,通过网络传输的请求和响应消息都会在前面追加5个字节,第一个字节表示消息是否经过加密,后面四个字节是一个以大端序表示的整数,表示消息的长度。对于其他消息交换模式,也是调用Buffers的TryReadMessage<TRequest>方法从缓冲区中读取请求消息。

public static async Task<TMessage> ReadSingleMessageAsync<TMessage>(this PipeReader reader, MessageParser<TMessage> parser) where TMessage:IMessage<TMessage>
{while (true){var result = await reader.ReadAsync();var buffer = result.Buffer;if (Buffers.TryReadMessage(parser, ref buffer, out var message)){return message!;}reader.AdvanceTo(buffer.Start, buffer.End);if (result.IsCompleted){break;}}throw new IOException("Fails to read message.");
}internal static class Buffers
{public static readonly int HeaderLength = 5;public static bool TryReadMessage<TRequest>(MessageParser<TRequest> parser, ref ReadOnlySequence<byte> buffer, out TRequest? message) where TRequest: IMessage<TRequest>{if (buffer.Length < HeaderLength){message = default;return false;}Span<byte> lengthBytes = stackalloc byte[4];buffer.Slice(1, 4).CopyTo(lengthBytes);var length = BinaryPrimitives.ReadInt32BigEndian(lengthBytes);if (buffer.Length < length + HeaderLength){message = default;return false;}message = parser.ParseFrom(buffer.Slice(HeaderLength, length));buffer = buffer.Slice(length + HeaderLength);return true;}
}

如下这个WriteMessageAsync扩展方法负责输出响应消息。

public static ValueTask<FlushResult> WriteMessageAsync(this PipeWriter writer, IMessage message)
{var length = message.CalculateSize();var span = writer.GetSpan(5 + length);span[0] = 0;BinaryPrimitives.WriteInt32BigEndian(span.Slice(1, 4), length);message.WriteTo(span.Slice(5, length));writer.Advance(5 + length);return writer.FlushAsync();
}

ClientStreamingCallHandler

ClientStreamingCallHandler<TService, TRequest, TResponse>代表Client Streaming模式下的ServerCallHandler,它由对应的ClientStreamingMethod<TService, TRequest, TResponse>委托创建而成。在重写的HandleCallAsyncCore方法中,除了服务实例,它还需要一个用来以“流”的方式读取请求的IAsyncStreamReader<TRequest>对象,它们都将作为参数传递给指定的委托,后者执行后会返回最终的响应消息。此消息同样通过上面这个WriteMessageAsync扩展方法予以回复。

internal class ClientStreamingCallHandler<TService, TRequest, TResponse> : ServerCallHandler<TService, TRequest, TResponse>where TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>
{private readonly ClientStreamingMethod<TService, TRequest, TResponse> _handler;public ClientStreamingCallHandler(ClientStreamingMethod<TService, TRequest, TResponse> handler, MessageParser<TRequest> requestParser):base(requestParser){_handler = handler;}protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext){using var scope = serverCallContext.HttpContext.RequestServices.CreateScope();var service = ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);var reader = serverCallContext.HttpContext.Request.BodyReader;var writer = serverCallContext.HttpContext.Response.BodyWriter;var streamReader = new HttpContextStreamReader<TRequest>(serverCallContext.HttpContext, RequestParser);var response = await _handler(service, streamReader, serverCallContext);await writer.WriteMessageAsync(response);}
}

IAsyncStreamReader<T>接口的实现类型为如下这个HttpContextStreamReader<T>。在了解了请求消息在网络中的结构之后,对于实现在该类型中针对请求的读取操作,应该不难理解。

public class HttpContextStreamReader<T> : IAsyncStreamReader<T> where T : IMessage<T>
{private readonly PipeReader _reader;private readonly MessageParser<T> _parser;private ReadOnlySequence<byte> _buffer;public HttpContextStreamReader(HttpContext httpContext, MessageParser<T> parser){_reader = httpContext.Request.BodyReader;_parser = parser;}public T Current { get; private set; } = default!;public async Task<bool> MoveNext(CancellationToken cancellationToken){var completed = false;if (_buffer.IsEmpty){var result = await _reader.ReadAsync(cancellationToken);_buffer = result.Buffer;completed = result.IsCompleted;}if (Buffers.TryReadMessage(_parser, ref _buffer, out var mssage)){Current = mssage!;_reader.AdvanceTo(_buffer.Start, _buffer.End);return true;}_reader.AdvanceTo(_buffer.Start, _buffer.End);_buffer = default;return !completed && await MoveNext(cancellationToken);}
}

ServerStreamingCallHandler

ServerStreamingCallHandler<TService, TRequest, TResponse>代表Server Streaming模式下的ServerCallHandler,它由对应的ServerStreamingMethod<TService, TRequest, TResponse>委托创建而成。在重写的HandleCallAsyncCore方法中,除了服务实例,它还需要一个用来以“流”的方式写入响应的IAsyncStreamWriter<TResponse>对象,它们都将作为参数传递给指定的委托。

internal class ServerStreamingCallHandler<TService, TRequest, TResponse> : ServerCallHandler<TService, TRequest, TResponse>where TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>
{private readonly ServerStreamingMethod<TService, TRequest, TResponse> _handler;public ServerStreamingCallHandler(ServerStreamingMethod<TService, TRequest, TResponse> handler, MessageParser<TRequest> requestParser):base(requestParser)=> _handler = handler;protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext){using var scope = serverCallContext.HttpContext.RequestServices.CreateScope();var service = ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);var httpContext = serverCallContext.HttpContext;var streamWriter = new HttpContextStreamWriter<TResponse>(httpContext);var request = await httpContext.Request.BodyReader.ReadSingleMessageAsync(RequestParser);await _handler(service, request, streamWriter, serverCallContext);}
}

IAsyncStreamWriter<T>接口的实现类型为如下这个HttpContextStreamWriter<T>,它直接调用上面定义的WriteMessageAsync扩展方法将指定的消息写入响应主体的输出流。

public class HttpContextStreamWriter<T> : IServerStreamWriter<T> where T : IMessage<T>
{private readonly PipeWriter _writer;public HttpContextStreamWriter(HttpContext httpContext) => _writer = httpContext.Response.BodyWriter;public Task WriteAsync(T message, CancellationToken cancellationToken = default){cancellationToken.ThrowIfCancellationRequested();return _writer.WriteMessageAsync(message).AsTask();}
}

DuplexStreamingCallHandler

DuplexStreamingCallHandler<TService, TRequest, TResponse>代表Duplex Streaming模式下的ServerCallHandler,它由对应的DuplexStreamingMethod<TService, TRequest, TResponse>委托创建而成。在重写的HandleCallAsyncCore方法中,除了服务实例,它还需要分别创建以“流”的方式读/写请求/响应的IAsyncStreamReader<TRequest>和IAsyncStreamWriter<TResponse>对象,对应的类型分别为上面定义的HttpContextStreamReader<TRequest>和HttpContextStreamWriter<TResponse>。

internal class DuplexStreamingCallHandler<TService, TRequest, TResponse> : ServerCallHandler<TService, TRequest, TResponse>where TService : classwhere TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>
{private readonly DuplexStreamingMethod<TService, TRequest, TResponse> _handler;public DuplexStreamingCallHandler(DuplexStreamingMethod<TService, TRequest, TResponse> handler, MessageParser<TRequest> requestParser) :base(requestParser)=> _handler = handler;protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext){using var scope = serverCallContext.HttpContext.RequestServices.CreateScope();var service = ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);       var streamReader = new HttpContextStreamReader<TRequest>(serverCallContext.HttpContext, RequestParser);var streamWriter = new HttpContextStreamWriter<TResponse>(serverCallContext.HttpContext);await _handler(service, streamReader, streamWriter, serverCallContext);}
}

四、路由注册

目前我们将针对四种消息交换模式的gRPC方法抽象成对应的泛型委托,并且可以利用它们创建ServerCallHandler,后者可以提供作为路由终结点处理器的RequestDelegate委托。枚举和对应ServerCallHandler之间的映射关系如下所示:

  • UnaryMethod<TService, TRequest, TResponse>:UnaryCallHandler<TService, TRequest, TResponse>

  • ClientStreamingMethod<TService, TRequest, TResponse>:ClientStreamingCallHandler<TService, TRequest, TResponse>

  • ServerStreamingMethod<TService, TRequest, TResponse>:ServerStreamingCallHandler<TService, TRequest, TResponse>

  • DuplexStreamingMethod<TService, TRequest, TResponse>:DuplexStreamingCallHandler<TService, TRequest, TResponse>

现在我们将整个路由注册的流程串起来,为此我们定义了如下这个IServiceBinder<TService>接口,它提供了两种方式将定义在服务类型TService中的gRPC方法注册成对应的路由终结点。

public interface IServiceBinder<TService> where TService : class
{IServiceBinder<TService> AddUnaryMethod<TRequest, TResponse>(string methodName, Func<TService, Func<TRequest, ServerCallContext, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;IServiceBinder<TService> AddClientStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<IAsyncStreamReader<TRequest>, ServerCallContext, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;IServiceBinder<TService> AddServerStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<TRequest, IServerStreamWriter<TResponse>, ServerCallContext, Task>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;IServiceBinder<TService> AddDuplexStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<IAsyncStreamReader<TRequest>, IServerStreamWriter<TResponse>, ServerCallContext, Task>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;IServiceBinder<TService> AddUnaryMethod<TRequest, TResponse>(Expression<Func<TService, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;IServiceBinder<TService> AddClientStreamingMethod<TRequest, TResponse>( Expression<Func<TService, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;IServiceBinder<TService> AddServerStreamingMethod<TRequest, TResponse>( Expression<Func<TService, Task>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;IServiceBinder<TService> AddDuplexStreamingMethod<TRequest, TResponse>( Expression<Func<TService, Task>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>;
}

路由终结点由路由模式和处理器两个元素组成,路由模式主要体现在由gRPC服务和操作名称组成的路由模板,我们默认使用服务类型的名称和方法名称(提出Async后缀)。为了能够对这两个名称进行定制,我们定义了如下两个特性GrpcServiceAttribute和GrpcMethodAttribute,它们可以分别标注在服务类型和操作方法上来指定一个任意的名称。

[AttributeUsage(AttributeTargets.Class)]
public class GrpcServiceAttribute: Attribute
{public string? ServiceName { get; set; }
}[AttributeUsage(AttributeTargets.Method)]
public class GrpcMethodAttribute : Attribute
{public string? MethodName { get; set; }
}

如下所示的ServiceBinder<TService> 是对IServiceBinder<TService> 接口的实现,它是对一个IEndpointRouteBuilder 对象的封装。对于实现的第一组方法,我们利用提供的方法名称与解析TService类型得到的服务名称合并,进而得到路由终结点的URL模板。这些方法还提供了一个针对gRPC方法签名的Func<TService,Func<…>>委托,我们利用它来将提供用于构建对应ServiceCallHandler的委托。我们最终利用IEndpointRouteBuilder 对象完成针对路由终结点的注册。

public class ServiceBinder<TService> : IServiceBinder<TService> where TService : class
{private readonly IEndpointRouteBuilder _routeBuilder;public ServiceBinder(IEndpointRouteBuilder routeBuilder) => _routeBuilder = routeBuilder;public IServiceBinder<TService> AddUnaryMethod<TRequest, TResponse>(string methodName, Func<TService, Func<TRequest, ServerCallContext, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>{Task<TResponse> GetMethod(TService service, TRequest request, ServerCallContext context) => methodAccessor(service)(request, context);var callHandler = new UnaryCallHandler<TService, TRequest, TResponse>(GetMethod, parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinder<TService> AddClientStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<IAsyncStreamReader<TRequest>, ServerCallContext, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>{Task<TResponse> GetMethod(TService service, IAsyncStreamReader<TRequest> reader, ServerCallContext context) => methodAccessor(service)(reader, context);var callHandler = new ClientStreamingCallHandler<TService, TRequest, TResponse>(GetMethod, parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinder<TService> AddServerStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<TRequest, IServerStreamWriter<TResponse>, ServerCallContext, Task>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>{ServerStreamingMethod<TService, TRequest, TResponse> handler = (service, request, writer, context) => methodAccessor(service)(request, writer, context);var callHandler = new ServerStreamingCallHandler<TService, TRequest, TResponse>(handler, parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinder<TService> AddDuplexStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<IAsyncStreamReader<TRequest>, IServerStreamWriter<TResponse>, ServerCallContext, Task>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>{DuplexStreamingMethod<TService, TRequest, TResponse> handler = (service, reader, writer, context) => methodAccessor(service)(reader, writer, context);var callHandler = new DuplexStreamingCallHandler<TService, TRequest, TResponse>(handler, parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);return this;}private static string GetPath(string methodName){var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;if (methodName.EndsWith("Async")){methodName = methodName.Substring(0, methodName.Length - 5);}return $"{serviceName}/{methodName}";}public IServiceBinder<TService> AddUnaryMethod<TRequest, TResponse>(Expression<Func<TService, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>{var method = CreateDelegate<UnaryMethod<TService, TRequest,TResponse>>(methodAccessor, out var methodName);var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;var callHandler = new UnaryCallHandler<TService, TRequest, TResponse>(method, parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinder<TService> AddClientStreamingMethod<TRequest, TResponse>( Expression<Func<TService, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>{var method = CreateDelegate<ClientStreamingMethod<TService, TRequest, TResponse>>(methodAccessor, out var methodName);var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;var callHandler = new ClientStreamingCallHandler<TService, TRequest, TResponse>(method, parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinder<TService> AddServerStreamingMethod<TRequest, TResponse>(Expression<Func<TService, Task>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>{var method = CreateDelegate<ServerStreamingMethod<TService, TRequest, TResponse>>(methodAccessor, out var methodName);var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;var callHandler = new ServerStreamingCallHandler<TService, TRequest, TResponse>(method, parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinder<TService> AddDuplexStreamingMethod<TRequest, TResponse>(Expression<Func<TService, Task>> methodAccessor, MessageParser<TRequest> parser)where TRequest : IMessage<TRequest>where TResponse : IMessage<TResponse>{var method = CreateDelegate<DuplexStreamingMethod<TService, TRequest, TResponse>>(methodAccessor, out var methodName);var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;var callHandler = new DuplexStreamingCallHandler<TService, TRequest, TResponse>(method, parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);return this;}private TDelegate CreateDelegate<TDelegate>(LambdaExpression expression, out string methodName) where TDelegate : Delegate{var method = ((MethodCallExpression)expression.Body).Method;methodName = method.GetCustomAttribute<GrpcMethodAttribute>()?.MethodName ?? method.Name;return (TDelegate)Delegate.CreateDelegate(typeof(TDelegate), method);}
}

由于第二组方法提供的针对gRPC方法调用的表达式,所以我们可以得到描述方法的MethodInfo对象,该对象不但解决了委托对象的创建问题,还可以提供方法的名称,所以这组方法无需提供gRPC方法的名称。但是提供的表达式并不能严格匹配方法的签名,所以无法提供编译时的错误检验,所以各有优缺点。

五、为gRPC服务定义一个接口

由于路由终结点的注册是针对服务类型进行的,所以我们决定让服务类型自身来完成所有的路由注册工作。在这里我们使用C# 11中一个叫做“静态接口方法”的特性,为服务类型定义如下这个IGrpcService<TService>接口,服务类型TService定义的所有gRPC方法的路由注册全部在静态方法Bind中完成,该方法将上述的IServiceBinder<TService>作为参数。

public interface  IGrpcService<TService> where TService:class
{static abstract void Bind(IServiceBinder<TService> binder);
}

我们定义了如下这个针对IEndpointRouteBuilder 接口的扩展方法完成针对指定服务类型的路由注册。为了与现有的方法区别开来,我特意将其命名为MapGrpcService2。该方法根据指定的IEndpointRouteBuilder 对象将ServiceBinder<TService>对象创建出来,并作为参数调用服务类型的静态Bind方法。到此为止,整个Mini版的gRPC服务端框架就构建完成了,接下来我们看看它能否工作。

public static class EndpointRouteBuilderExtensions
{public static IEndpointRouteBuilder MapGrpcService2<TService>(this IEndpointRouteBuilder routeBuilder) where TService : class, IGrpcService<TService>{var binder = new ServiceBinder<TService>(routeBuilder);TService.Bind(binder);return routeBuilder;}
}

六、重新定义和承载服务

我们开篇演示了ASP.NET Core gRPC的服务定义、承载和调用。如果我们上面构建的Mini版gRPC框架能够正常工作,意味着客户端代码可以保持不变,我们现在就来试试看。我们在Server项目中将GreeterService服务类型改成如下的形式,它不再继承任何基类,只实现IGrpcService<GreeterService>接口。针对四种消息交换模式的四个方法的实现方法保持不变,在实现的静态Bind方法中,我们采用两种形式完成了针对这四个方法的路由注册。

[GrpcService(ServiceName = "Greeter")]
public class GreeterService: IGrpcService<GreeterService>
{public Task<HelloReply> SayHelloUnaryAsync(HelloRequest request, ServerCallContext context)=> Task.FromResult(new HelloReply { Message = $"Hello, {request.Name}" });public async Task<HelloReply> SayHelloClientStreamingAsync(IAsyncStreamReader<HelloRequest> reader, ServerCallContext context){var list = new List<string>();while (await reader.MoveNext(CancellationToken.None)){list.Add(reader.Current.Name);}return new HelloReply { Message = $"Hello, {string.Join(",", list)}" };}public  async Task SayHelloServerStreamingAsync(Empty request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context){await responseStream.WriteAsync(new HelloReply { Message = "Hello, Foo!" });await Task.Delay(1000);await responseStream.WriteAsync(new HelloReply { Message = "Hello, Bar!" });await Task.Delay(1000);await responseStream.WriteAsync(new HelloReply { Message = "Hello, Baz!" });}public async Task SayHelloDuplexStreamingAsync(IAsyncStreamReader<HelloRequest> reader, IServerStreamWriter<HelloReply> writer, ServerCallContext context){while (await reader.MoveNext()){await writer.WriteAsync(new HelloReply { Message = $"Hello {reader.Current.Name}" });}}public static void Bind(IServiceBinder<GreeterService> binder){binder.AddUnaryMethod<HelloRequest, HelloReply>(it =>it.SayHelloUnaryAsync(default!,default!), HelloRequest.Parser).AddClientStreamingMethod<HelloRequest, HelloReply>(it => it.SayHelloClientStreamingAsync(default!, default!), HelloRequest.Parser).AddServerStreamingMethod<Empty, HelloReply>(nameof(SayHelloServerStreamingAsync), it => it.SayHelloServerStreamingAsync, Empty.Parser).AddDuplexStreamingMethod<HelloRequest, HelloReply>(nameof(SayHelloDuplexStreamingAsync), it => it.SayHelloDuplexStreamingAsync, HelloRequest.Parser);}}
}

服务承载程序直接将针对MapGrpcService<GreeterService>方法的调用换成MapGrpcService2<GreeterService>。由于整个框架根本不需要预先注册任何的服务,所以针对AddGrpc扩展方法的调用也可以删除。

using GrpcMini;
using Microsoft.AspNetCore.Server.Kestrel.Core;var builder = WebApplication.CreateBuilder(args);
builder.WebHost.ConfigureKestrel(kestrel => kestrel.ConfigureEndpointDefaults(options => options.Protocols = HttpProtocols.Http2));
var app = builder.Build();
app.MapGrpcService2<Server.Greeter>();
app.Run();

再次运行我们的程序,客户端依然可以得到相同的输出。

159d076307f303d0d9bfbbe5a1191091.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/280531.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows 10的下一个更新将在您观看视频时隐藏通知

Windows 10’s Focus Assist feature temporarily hides incoming notifications. In Windows 10’s next update, Focus Assist can activate when you’re using any full-screen app, whether that’s YouTube in a browser, Netflix, or a desktop video player like VLC. …

Ubuntu安装Samba文件共享服务器(NAS)

终于有点时间来解决下家中NAS需求了。一般自制NAS&#xff0c;只有选Samba。速度比FTP快&#xff0c;便利性比Windows文件夹共享好&#xff0c;设置多等等。 ▶参考&#xff1a;samba简介 安装Samba $ sudo apt-get update $ sudo apt-get install samba samba-common-bin 核心…

django21:admin后台管理\media配置\图片防盗链\暴露后端资源\路由分发\时间分类

admin后台管理 创建超级用户 createsuperuser 1.到应用下的admin.py注册模型表 from django.contrib import admin from blog import models # Register your models here.admin.site.register(models.UserInfo) admin.site.register(models.Article) admin.site.register(m…

Flask博客开发——Tinymce编辑器

之前Flask博客的文本编辑器比较简陋&#xff0c;这里为博客添加个优雅易用的Tinymce文本编辑器。 github见&#xff1a;https://github.com/ikheu/my_flasky 1 项目中添加Tinymce 下载好Tinymce包以及语言包&#xff0c;并添加到项目中。添加到项目的方法&#xff0c;参考了这篇…

Hello, Raspberry Pi.

1.概要最近在研究自动升级开源项目的时候偶然想到IoT领域的自动升级&#xff0c;突然想起2016年买的树莓派&#xff08;Raspberry Pi&#xff09;。那就分享一下如何入门树莓派的教程&#xff0c;我当时一共买了两块一款是Raspberry Pi 3b&#xff08;2016年价格259元去年以抽奖…

supersu_SuperSU已从Play商店中删除,这是替代使用的方法

supersuSuperSU has long been a staple in the rooted Android community. For years, the process for getting a rooted handset was: unlock the bootloader, flash a custom recovery, install SuperSU. That’s just how it was. 长期以来&#xff0c;SuperSU一直是扎根于…

django项目开发1:搭建虚拟环境

需求 不同项目依赖不同模块版本&#xff0c;不能共用一套环境&#xff0c;虚拟环境。在系统的python环境安装 安装 pip3 install virtualenv pip3 install virtualenvwrapper-win环境变量 # 配置环境变量&#xff1a; # 控制面板 > 系统和安全 > 系统 > 高级系统设…

div 包裹_如何查看到达之前收到的包裹和邮件

div 包裹The United States Postal Service, UPS, and FedEx all offer online dashboards where you can see exactly what packages (and letters, in the case of the US Postal Service) are scheduled to arrive at your address. They’ll even email and send you text …

py文件的运行

安装过程及配置 安装过程准备&#xff1a; 下载好Python的安装程序后&#xff0c;开始安装&#xff0c;在进入安装界面后一定确保勾选将Python加入到系统环境变量的路径里。如图所示&#xff1a; 2如果没有选取&#xff0c;那么按照下面的步骤进行操作。在桌面上用鼠标右键点击…

网络编程基础(一)

一.楔子 你现在已经学会了写python代码&#xff0c;假如你写了两个python文件a.py和b.py&#xff0c;分别去运行&#xff0c;你就会发现&#xff0c;这两个python的文件分别运行的很好。但是如果这两个程序之间想要传递一个数据&#xff0c;你要怎么做呢&#xff1f; 这个问题以…

dotnet-exec 让 C# 程序更简单

dotnet-exec 让 C# 程序更简单Introdotnet-exec是一个可以执行 C# 程序而不需要项目文件的命令行工具&#xff0c;并且你可以指定自定义的入口方法不仅仅是Main方法在 python/NodeJs 里&#xff0c;可以使用python xx.py/node xx.js来运行代码&#xff0c;在 C# 中一般是需要项…

office数据集dslr_如何将照片从DSLR无线传输到智能手机

office数据集dslrYou’ve got a great digital camera. You’ve got all your social media apps on your phone. Wouldn’t it be nice if you could snap a beautiful shot with your DSLR and shuttle it right over to your phone to throw up on Facebook or Instagram? …

文件管理、命令别名和glob

一、复制命令:cp src dest1.如果scr是文件a.如果dest不存在&#xff0c;则新建dest并将src的内容填充到dest里b.如果dest存在&#xff0c;则会用src里的内容覆盖dest里的内容&#xff0c;即覆盖dest2.如果src是目录a.如果dest不存在&#xff0c;则新建dest,然后把src下的内容复…

django版本区别/与版本匹配

一、区别 路由层 1.django 1.x路由层使用url方法 django 2.x和3.x版本使用path方法 url() 第一个参数支持正则 path()第一个参数是不支持正则的 可以使用 re_path替代url() from django.urls import re_path # django2.0中的re_path #不建议导入url,不能区分版本 #from djang…

中兴面试一个星期没有回音_如何在没有回声的情况下从亚马逊获取即时时尚建议...

中兴面试一个星期没有回音The Echo Look is a new device from Amazon that’s able to take a look at your outfits and tell you which one looks the best on you. However, you actually don’t need the Echo Look to get this kind of instant fashion advice from Amaz…

table分页的简单实现逻辑

为什么80%的码农都做不了架构师&#xff1f;>>> //table分页函数showPageNum: function(pageNum, allPageNum) { //pageNum点击第几页&#xff0c;allPageNum总页数$(".c_page .c_page_list").children().remove();for(var i1;i<allPageNum;i){var p…

Cocos Creator Ui系统

为什么80%的码农都做不了架构师&#xff1f;>>> 游戏场景&#xff1a;开发时组织游戏内容的中心&#xff1b;其中渲染根节点Canvas&#xff0c;包括属性 Design Resolution&#xff08;设计分辨率&#xff09; fit width,fit height 设计分辨率是内容生产者在制作场…

推荐一个使用 .NET 6 开发的开源媒体系统

你好&#xff0c;这里是 Dotnet 工具箱&#xff0c;定期分享 Dotnet 有趣&#xff0c;实用的工具和组件&#xff0c;希望对您有用&#xff01;什么是 Jellyfin ?Jellyfin 是一个免费的媒体系统&#xff0c;它可以让您更好的管理媒体&#xff0c;包括电影&#xff0c;音乐&…

亚马逊echo中国使用_如何将Amazon Echo与蓝牙扬声器配对以获得更大的声音

亚马逊echo中国使用Although both the full size Echo and the Echo Dot have respectable sound for their given sizes, compared to much bigger tabletop Bluetooth speakers (or a full home theater system with Bluetooth support), they’re pretty anemic. Let’s loo…

如何用Markdown轻松排版知乎专栏文章?

免费、便捷、高效的知乎专栏Markdown排版技巧。希望读过本文&#xff0c;可以让你的写作过程也变得更愉悦。 痛点 从前&#xff0c;写作时的排版是件辛苦事。不论你把排版环节放在写作中还是写作后&#xff0c;总会在心里清楚意识到&#xff0c;还有这么一个繁重而无趣的工作在…