用最少的代码打造一个Mini版的gRPC框架
一、“标准”的gRPC定义、承载和调用
二、将gRPC方法抽象成委托
三、将委托转换成RequestDelegate
UnaryCallHandler
ClientStreamingCallHandler
ServerStreamingCallHandler
DuplexStreamingCallHandler
四、路由注册
五、为gRPC服务定义一个接口
六、重新定义和承载服务
一、“标准”的gRPC定义、承载和调用
可能有些读者朋友们对ASP.NET Core gRPC还不是太熟悉,所以我们先来演示一下如何在一个ASP.NET Core应用中如何定义和承载一个简单的gRPC服务,并使用自动生成的客户端代码进行调用。我们新建一个空的解决方案,并在其中添加如下所示的三个项目。
我们在类库项目Proto中定义了如下所示Greeter服务,并利用其中定义的四个操作分别模拟四种消息交换模式。HelloRequest 和HelloReply 是它们涉及的两个ProtoBuf消息。
syntax = "proto3"; import "google/protobuf/empty.proto"; service Greeter { rpc SayHelloUnary (HelloRequest) returns ( HelloReply); rpc SayHelloServerStreaming (google.protobuf.Empty) returns (stream HelloReply); rpc SayHelloClientStreaming (stream HelloRequest) returns (HelloReply); rpc SayHelloDuplexStreaming (stream HelloRequest) returns (stream HelloReply); } message HelloRequest { string name = 1; } message HelloReply { string message = 1; }
ASP.NET Core项目中定义了如下的GreeterServce服务实现了定义的四个操作,基类GreeterBase是针对上面这个.proto文件生成的类型。
public class GreeterService: GreeterBase { public override TaskSayHelloUnary(HelloRequest request, ServerCallContext context) => Task.FromResult(new HelloReply { Message = $"Hello, {request.Name}" }); public override async Task SayHelloClientStreaming(IAsyncStreamReader reader, ServerCallContext context) { var list = new Liststring>(); while (await reader.MoveNext(CancellationToken.None)) { list.Add(reader.Current.Name); } return new HelloReply { Message = $"Hello, {string.Join(",", list)}" }; } public override async Task SayHelloServerStreaming(Empty request, IServerStreamWriter responseStream, ServerCallContext context) { await responseStream.WriteAsync(new HelloReply { Message = "Hello, Foo!" }); await Task.Delay(1000); await responseStream.WriteAsync(new HelloReply { Message = "Hello, Bar!" }); await Task.Delay(1000); await responseStream.WriteAsync(new HelloReply { Message = "Hello, Baz!" }); } public override async Task SayHelloDuplexStreaming(IAsyncStreamReader reader, IServerStreamWriter writer, ServerCallContext context) { while (await reader.MoveNext()) { await writer.WriteAsync(new HelloReply { Message = $"Hello {reader.Current.Name}" }); } } }
具体的服务承载代码如下。我们采用Minimal API的形式,通过调用IServiceCollection接口的AddGrpc扩展方法注册相关服务,并调用MapGrpcService
var builder = WebApplication.CreateBuilder(args); builder.Services.AddGrpc(); builder.WebHost.ConfigureKestrel(kestrel => kestrel.ConfigureEndpointDefaults(options => options.Protocols = HttpProtocols.Http2)); var app = builder.Build(); app.MapGrpcService(); app.Run();
在控制台项目Client中,我们利用生成出来的客户端类型GreeterClient分别一对应的服务交换模式调用了四个gRPC方法。
var channel = GrpcChannel.ForAddress("http://localhost:5000"); var client = new GreeterClient(channel); Console.WriteLine("Unary"); await UnaryCallAsync(); Console.WriteLine("nServer Streaming"); await ServerStreamingCallAsync(); Console.WriteLine("nClient Streaming"); await ClientStreamingCallAsync(); Console.WriteLine("nDuplex Streaming"); await DuplexStreamingCallAsync(); Console.ReadLine(); async Task UnaryCallAsync() { var request = new HelloRequest { Name = "foobar" }; var reply = await client.SayHelloUnaryAsync(request); Console.WriteLine(reply.Message); } async Task ServerStreamingCallAsync() { var streamingCall = client.SayHelloServerStreaming(new Empty()); var reader = streamingCall.ResponseStream; while (await reader.MoveNext(CancellationToken.None)) { Console.WriteLine(reader.Current.Message); } } async Task ClientStreamingCallAsync() { var streamingCall = client.SayHelloClientStreaming(); var writer = streamingCall.RequestStream; await writer.WriteAsync(new HelloRequest { Name = "Foo" }); await Task.Delay(1000); await writer.WriteAsync(new HelloRequest { Name = "Bar" }); await Task.Delay(1000); await writer.WriteAsync(new HelloRequest { Name = "Baz" }); await writer.CompleteAsync(); var reply = await streamingCall.ResponseAsync; Console.WriteLine(reply.Message); } async Task DuplexStreamingCallAsync() { var streamingCall = client.SayHelloDuplexStreaming(); var writer = streamingCall.RequestStream; var reader = streamingCall.ResponseStream; _ = Task.Run(async () => { await writer.WriteAsync(new HelloRequest { Name = "Foo" }); await Task.Delay(1000); await writer.WriteAsync(new HelloRequest { Name = "Bar" }); await Task.Delay(1000); await writer.WriteAsync(new HelloRequest { Name = "Baz" }); await writer.CompleteAsync(); }); await foreach (var reply in reader.ReadAllAsync()) { Console.WriteLine(reply.Message); } }
如下所示的是客户端控制台上的输出结果。
二、将gRPC方法抽象成委托
通过上面的演示我们也知道,承载的gRPC类型最终会将其实现的方法注册成路由终结点,这一点其实和MVC是一样的。但是gRPC的方法和定义在Controller类型中的Action方法不同之处在于,前者的签名其实是固定的。如果我们将请求和响应消息类型使用Request和Reply来表示,四种消息交换模式的方法签名就可以写成如下的形式。
TaskUnary(Request request, ServerCallContext context); Task ClientStreaming(IAsyncStreamReader reader, ServerCallContext context); Task ServerStreaming(Empty request, IServerStreamWriter responseStream, ServerCallContext context); Task DuplexStreaming(IAsyncStreamReader reader, IServerStreamWriter writer, ServerCallContext context);
“流式”方法中用来读取请求和写入响应的IAsyncStreamReader
public interface IAsyncStreamReaderout T> { T Current { get; } Taskbool> MoveNext(CancellationToken cancellationToken = default); } public interface IAsyncStreamWriterin T> { Task WriteAsync(T message, CancellationToken cancellationToken = default); } public interface IServerStreamWriterin T> : IAsyncStreamWriter{ } public interface IClientStreamWriterin T> : IAsyncStreamWriter{ Task CompleteAsync(); }
表示服务端调用上下文的ServerCallContext 类型具有丰富的成员,但是它的本质就是对HttpContext上下文的封装,所以我们对它进行了简化。如下面的代码片段所示,我们给予这个上下文类型两个属性成员,一个是表示请求上下文的HttpContext,另一个则是用来设置响应状态StatusCode,后者对应的枚举定义了完整的gRPC状态码。
public class ServerCallContext { public StatusCode StatusCode { get; set; } = StatusCode.OK; public HttpContext HttpContext { get; } public ServerCallContext(HttpContext httpContext)=> HttpContext = httpContext; } public enum StatusCode { OK = 0, Cancelled = 1, Unknown = 2, InvalidArgument = 3, DeadlineExceeded = 4, NotFound = 5, AlreadyExists = 6, PermissionDenied = 7, Unauthenticated = 0x10, ResourceExhausted = 8, FailedPrecondition = 9, Aborted = 10, OutOfRange = 11, Unimplemented = 12, Internal = 13, Unavailable = 14, DataLoss = 0xF }
既然方法签名固定,意味着我们可以将四种gRPC方法定义成如下四个对应的委托,泛型参数TService、TRequest和TResponse分别表示服务、请求和响应类型。
public delegate TaskUnaryMethod (TService service, TRequest request, ServerCallContext context) where TService : class where TRequest : IMessage where TResponse : IMessage ; public delegate Task ClientStreamingMethod (TService service, IAsyncStreamReader reader, ServerCallContext context) where TService : class where TRequest : IMessage where TResponse : IMessage ; public delegate Task ServerStreamingMethod (TService service, TRequest request, IServerStreamWriter writer, ServerCallContext context) where TService : class where TRequest : IMessage where TResponse : IMessage ; public delegate Task DuplexStreamingMethod (TService service, IAsyncStreamReader reader, IServerStreamWriter writer, ServerCallContext context) where TService : class where TRequest : IMessage where TResponse : IMessage ;
我们知道路由的本质就是创建一组路由模式(Pattern)和对应处理器之间的映射关系。路由模式很简单,对应的路由模板为“{ServiceName}/{MethodName}”,并且采用Post请求方法。对应的处理器最终体现为一个RequestDelegate。那么只要我们能够将上述四种委托类型都转换成RequestDelegate委托,一切都迎刃而解了。
三、将委托转换成RequestDelegate
为了将四种委托类型转化成RequestDelegate,我们将后者实现为一个ServiceCallHandler类型,并为其定义了如下两个基类。ServerCallHandlerBase的HandleCallAsync方法正好与RequestDelegate委托的签名一致,所以这个方法最终会用来处理gRPC请求。不同的消息交换模式采用不同的请求处理方式,只需实现抽象方法HandleCallAsyncCore就可以了。HandleCallAsync方法在调用此抽象方法之前将响应的ContentType设置成gRPC标准的响应类型“application/grpc”。在此之后将状态码设置为“grpc-status”首部,它将在HTTP2的DATA帧发送完毕后,以HEADERS帧发送到客户端。这两项操作都是gRPC协议的一部分。
public abstract class ServerCallHandlerBase { public async Task HandleCallAsync(HttpContext httpContext) { try { var serverCallContext = new ServerCallContext(httpContext); var response = httpContext.Response; response.ContentType = "application/grpc"; await HandleCallAsyncCore(serverCallContext); SetStatus(serverCallContext.StatusCode); } catch { SetStatus(StatusCode.Unknown); } void SetStatus(StatusCode statusCode) { httpContext.Response.AppendTrailer("grpc-status", ((int)statusCode).ToString()); } } protected abstract Task HandleCallAsyncCore(ServerCallContext serverCallContext); } public abstract class ServerCallHandler: ServerCallHandlerBase where TService : class where TRequest : IMessage where TResponse : IMessage { protected ServerCallHandler(MessageParser requestParser)=> RequestParser = requestParser; public MessageParser RequestParser { get; } }
ServerCallHandler
UnaryCallHandler
基于Unary消息交换模式的ServerCallHandler的具体类型为UnaryCallHandler
internal class UnaryCallHandler: ServerCallHandler where TService : class where TRequest : IMessage where TResponse : IMessage { private readonly UnaryMethod _handler; public UnaryCallHandler(UnaryMethod handler, MessageParser requestParser):base(requestParser) => _handler = handler; protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext) { using var scope = serverCallContext.HttpContext.RequestServices.CreateScope(); var service = ActivatorUtilities.CreateInstance (scope.ServiceProvider); var httpContext = serverCallContext.HttpContext; var request = await httpContext.Request.BodyReader.ReadSingleMessageAsync (RequestParser); var reply = await _handler(service, request!, serverCallContext); await httpContext.Response.BodyWriter.WriteMessageAsync(reply); } }
请求消息是通过如下这个ReadSingleMessageAsync
public static async TaskReadSingleMessageAsync (this PipeReader reader, MessageParser parser) where TMessage:IMessage { while (true) { var result = await reader.ReadAsync(); var buffer = result.Buffer; if (Buffers.TryReadMessage(parser, ref buffer, out var message)) { return message!; } reader.AdvanceTo(buffer.Start, buffer.End); if (result.IsCompleted) { break; } } throw new IOException("Fails to read message."); } internal static class Buffers { public static readonly int HeaderLength = 5; public static bool TryReadMessage (MessageParser parser, ref ReadOnlySequencebyte> buffer, out TRequest? message) where TRequest: IMessage { if (buffer.Length default; return false; } Spanbyte> lengthBytes = stackalloc byte[4]; buffer.Slice(1, 4).CopyTo(lengthBytes); var length = BinaryPrimitives.ReadInt32BigEndian(lengthBytes); if (buffer.Length default; return false; } message = parser.ParseFrom(buffer.Slice(HeaderLength, length)); buffer = buffer.Slice(length + HeaderLength); return true; } }
如下这个WriteMessageAsync扩展方法负责输出响应消息。
public static ValueTaskWriteMessageAsync(this PipeWriter writer, IMessage message) { var length = message.CalculateSize(); var span = writer.GetSpan(5 + length); span[0] = 0; BinaryPrimitives.WriteInt32BigEndian(span.Slice(1, 4), length); message.WriteTo(span.Slice(5, length)); writer.Advance(5 + length); return writer.FlushAsync(); }
ClientStreamingCallHandler
ClientStreamingCallHandler
internal class ClientStreamingCallHandler: ServerCallHandler where TService : class where TRequest : IMessage where TResponse : IMessage { private readonly ClientStreamingMethod _handler; public ClientStreamingCallHandler(ClientStreamingMethod handler, MessageParser requestParser) :base(requestParser) { _handler = handler; } protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext) { using var scope = serverCallContext.HttpContext.RequestServices.CreateScope(); var service = ActivatorUtilities.CreateInstance (scope.ServiceProvider); var reader = serverCallContext.HttpContext.Request.BodyReader; var writer = serverCallContext.HttpContext.Response.BodyWriter; var streamReader = new HttpContextStreamReader (serverCallContext.HttpContext, RequestParser); var response = await _handler(service, streamReader, serverCallContext); await writer.WriteMessageAsync(response); } }
IAsyncStreamReader
public class HttpContextStreamReader: IAsyncStreamReader where T : IMessage { private readonly PipeReader _reader; private readonly MessageParser _parser; private ReadOnlySequencebyte> _buffer; public HttpContextStreamReader(HttpContext httpContext, MessageParser parser) { _reader = httpContext.Request.BodyReader; _parser = parser; } public T Current { get; private set; } = default!; public async Taskbool> MoveNext(CancellationToken cancellationToken) { var completed = false; if (_buffer.IsEmpty) { var result = await _reader.ReadAsync(cancellationToken); _buffer = result.Buffer; completed = result.IsCompleted; } if (Buffers.TryReadMessage(_parser, ref _buffer, out var mssage)) { Current = mssage!; _reader.AdvanceTo(_buffer.Start, _buffer.End); return true; } _reader.AdvanceTo(_buffer.Start, _buffer.End); _buffer = default; return !completed && await MoveNext(cancellationToken); } }
ServerStreamingCallHandler
ServerStreamingCallHandler
internal class ServerStreamingCallHandler: ServerCallHandler where TService : class where TRequest : IMessage where TResponse : IMessage { private readonly ServerStreamingMethod _handler; public ServerStreamingCallHandler(ServerStreamingMethod handler, MessageParser requestParser):base(requestParser) => _handler = handler; protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext) { using var scope = serverCallContext.HttpContext.RequestServices.CreateScope(); var service = ActivatorUtilities.CreateInstance (scope.ServiceProvider); var httpContext = serverCallContext.HttpContext; var streamWriter = new HttpContextStreamWriter (httpContext); var request = await httpContext.Request.BodyReader.ReadSingleMessageAsync(RequestParser); await _handler(service, request, streamWriter, serverCallContext); } }
IAsyncStreamWriter
public class HttpContextStreamWriter: IServerStreamWriter where T : IMessage { private readonly PipeWriter _writer; public HttpContextStreamWriter(HttpContext httpContext) => _writer = httpContext.Response.BodyWriter; public Task WriteAsync(T message, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); return _writer.WriteMessageAsync(message).AsTask(); } }
DuplexStreamingCallHandler
DuplexStreamingCallHandler
internal class DuplexStreamingCallHandler: ServerCallHandler where TService : class where TRequest : IMessage where TResponse : IMessage { private readonly DuplexStreamingMethod _handler; public DuplexStreamingCallHandler(DuplexStreamingMethod handler, MessageParser requestParser) :base(requestParser) => _handler = handler; protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext) { using var scope = serverCallContext.HttpContext.RequestServices.CreateScope(); var service = ActivatorUtilities.CreateInstance (scope.ServiceProvider); var reader = serverCallContext.HttpContext.Request.BodyReader; var writer = serverCallContext.HttpContext.Response.BodyWriter; var streamReader = new HttpContextStreamReader (serverCallContext.HttpContext, RequestParser); var streamWriter = new HttpContextStreamWriter (serverCallContext.HttpContext); await _handler(service, streamReader, streamWriter, serverCallContext); } }
四、路由注册
目前我们将针对四种消息交换模式的gRPC方法抽象成对应的泛型委托,并且可以利用它们创建ServerCallHandler,后者可以提供作为路由终结点处理器的RequestDelegate委托。枚举和对应ServerCallHandler之间的映射关系如下所示:
- UnaryMethod
:UnaryCallHandler - ClientStreamingMethod
:ClientStreamingCallHandler - ServerStreamingMethod
:ServerStreamingCallHandler - DuplexStreamingMethod
:DuplexStreamingCallHandler
现在我们将整个路由注册的流程串起来,为此我们定义了如下这个IServiceBinder
public interface IServiceBinderwhere TService : class { IServiceBinder AddUnaryMethod (string methodName, Func >> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage ; IServiceBinder AddClientStreamingMethod (string methodName, Func , ServerCallContext, Task >> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage ; IServiceBinder AddServerStreamingMethod (string methodName, Func , ServerCallContext, Task>> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage ; IServiceBinder AddDuplexStreamingMethod (string methodName, Func , IServerStreamWriter , ServerCallContext, Task>> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage ; IServiceBinder AddUnaryMethod (Expression >> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage ; IServiceBinder AddClientStreamingMethod ( Expression >> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage ; IServiceBinder AddServerStreamingMethod ( Expression > methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage ; IServiceBinder AddDuplexStreamingMethod ( Expression > methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage ; }
路由终结点由路由模式和处理器两个元素组成,路由模式主要体现在由gRPC服务和操作名称组成的路由模板,我们默认使用服务类型的名称和方法名称(提出Async后缀)。为了能够对这两个名称进行定制,我们定义了如下两个特性GrpcServiceAttribute和GrpcMethodAttribute,它们可以分别标注在服务类型和操作方法上来指定一个任意的名称。
[AttributeUsage(AttributeTargets.Class)] public class GrpcServiceAttribute: Attribute { public string? ServiceName { get; set; } } [AttributeUsage(AttributeTargets.Method)] public class GrpcMethodAttribute : Attribute { public string? MethodName { get; set; } }
如下所示的ServiceBinder
public class ServiceBinder: IServiceBinder where TService : class { private readonly IEndpointRouteBuilder _routeBuilder; public ServiceBinder(IEndpointRouteBuilder routeBuilder) => _routeBuilder = routeBuilder; public IServiceBinder AddUnaryMethod (string methodName, Func >> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage { Task GetMethod(TService service, TRequest request, ServerCallContext context) => methodAccessor(service)(request, context); var callHandler = new UnaryCallHandler (GetMethod, parser); _routeBuilder.MapPost(ServiceBinder .GetPath(methodName), callHandler.HandleCallAsync); return this; } public IServiceBinder AddClientStreamingMethod (string methodName, Func , ServerCallContext, Task >> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage { Task GetMethod(TService service, IAsyncStreamReader reader, ServerCallContext context) => methodAccessor(service)(reader, context); var callHandler = new ClientStreamingCallHandler (GetMethod, parser); _routeBuilder.MapPost(ServiceBinder .GetPath(methodName), callHandler.HandleCallAsync); return this; } public IServiceBinder AddServerStreamingMethod (string methodName, Func , ServerCallContext, Task>> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage { ServerStreamingMethod handler = (service, request, writer, context) => methodAccessor(service)(request, writer, context); var callHandler = new ServerStreamingCallHandler (handler, parser); _routeBuilder.MapPost(ServiceBinder .GetPath(methodName), callHandler.HandleCallAsync); return this; } public IServiceBinder AddDuplexStreamingMethod (string methodName, Func , IServerStreamWriter , ServerCallContext, Task>> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage { DuplexStreamingMethod handler = (service, reader, writer, context) => methodAccessor(service)(reader, writer, context); var callHandler = new DuplexStreamingCallHandler (handler, parser); _routeBuilder.MapPost(ServiceBinder .GetPath(methodName), callHandler.HandleCallAsync); return this; } private static string GetPath(string methodName) { var serviceName = typeof(TService).GetCustomAttribute ()?.ServiceName ?? typeof(TService).Name; if (methodName.EndsWith("Async")) { methodName = methodName.Substring(0, methodName.Length - 5); } return $"{serviceName}/{methodName}"; } public IServiceBinder AddUnaryMethod (Expression >> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage { var method = CreateDelegate >(methodAccessor, out var methodName); var serviceName = typeof(TService).GetCustomAttribute ()?.ServiceName ?? typeof(TService).Name; var callHandler = new UnaryCallHandler (method, parser); _routeBuilder.MapPost(ServiceBinder .GetPath(methodName), callHandler.HandleCallAsync); return this; } public IServiceBinder AddClientStreamingMethod ( Expression >> methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage { var method = CreateDelegate >(methodAccessor, out var methodName); var serviceName = typeof(TService).GetCustomAttribute ()?.ServiceName ?? typeof(TService).Name; var callHandler = new ClientStreamingCallHandler (method, parser); _routeBuilder.MapPost(ServiceBinder .GetPath(methodName), callHandler.HandleCallAsync); return this; } public IServiceBinder AddServerStreamingMethod (Expression > methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage { var method = CreateDelegate >(methodAccessor, out var methodName); var serviceName = typeof(TService).GetCustomAttribute ()?.ServiceName ?? typeof(TService).Name; var callHandler = new ServerStreamingCallHandler (method, parser); _routeBuilder.MapPost(ServiceBinder .GetPath(methodName), callHandler.HandleCallAsync); return this; } public IServiceBinder AddDuplexStreamingMethod (Expression > methodAccessor, MessageParser parser) where TRequest : IMessage where TResponse : IMessage { var method = CreateDelegate >(methodAccessor, out var methodName); var serviceName = typeof(TService).GetCustomAttribute ()?.ServiceName ?? typeof(TService).Name; var callHandler = new DuplexStreamingCallHandler (method, parser); _routeBuilder.MapPost(ServiceBinder .GetPath(methodName), callHandler.HandleCallAsync); return this; } private TDelegate CreateDelegate (LambdaExpression expression, out string methodName) where TDelegate : Delegate { var method = ((MethodCallExpression)expression.Body).Method; methodName = method.GetCustomAttribute ()?.MethodName ?? method.Name; return (TDelegate)Delegate.CreateDelegate(typeof(TDelegate), method); } }
由于第二组方法提供的针对gRPC方法调用的表达式,所以我们可以得到描述方法的MethodInfo对象,该对象不但解决了委托对象的创建问题,还可以提供方法的名称,所以这组方法无需提供gRPC方法的名称。但是提供的表达式并不能严格匹配方法的签名,所以无法提供编译时的错误检验,所以各有优缺点。
五、为gRPC服务定义一个接口
由于路由终结点的注册是针对服务类型进行的,所以我们决定让服务类型自身来完成所有的路由注册工作。在这里我们使用C# 11中一个叫做“静态接口方法”的特性,为服务类型定义如下这个IGrpcService
public interface IGrpcServicewhere TService:class { static abstract void Bind(IServiceBinder binder); }
我们定义了如下这个针对IEndpointRouteBuilder 接口的扩展方法完成针对指定服务类型的路由注册。为了与现有的方法区别开来,我特意将其命名为MapGrpcService2。该方法根据指定的IEndpointRouteBuilder 对象将ServiceBinder
public static class EndpointRouteBuilderExtensions { public static IEndpointRouteBuilder MapGrpcService2(this IEndpointRouteBuilder routeBuilder) where TService : class, IGrpcService { var binder = new ServiceBinder (routeBuilder); TService.Bind(binder); return routeBuilder; } }
六、重新定义和承载服务
我们开篇演示了ASP.NET Core gRPC的服务定义、承载和调用。如果我们上面构建的Mini版gRPC框架能够正常工作,意味着客户端代码可以保持不变,我们现在就来试试看。我们在Server项目中将GreeterService服务类型改成如下的形式,它不再继承任何基类,只实现IGrpcService
[GrpcService(ServiceName = "Greeter")] public class GreeterService: IGrpcService{ public Task SayHelloUnaryAsync(HelloRequest request, ServerCallContext context) => Task.FromResult(new HelloReply { Message = $"Hello, {request.Name}" }); public async Task .AddUnaryMethodSayHelloClientStreamingAsync(IAsyncStreamReader reader, ServerCallContext context) { var list = new Liststring>(); while (await reader.MoveNext(CancellationToken.None)) { list.Add(reader.Current.Name); } return new HelloReply { Message = $"Hello, {string.Join(",", list)}" }; } public async Task SayHelloServerStreamingAsync(Empty request, IServerStreamWriter responseStream, ServerCallContext context) { await responseStream.WriteAsync(new HelloReply { Message = "Hello, Foo!" }); await Task.Delay(1000); await responseStream.WriteAsync(new HelloReply { Message = "Hello, Bar!" }); await Task.Delay(1000); await responseStream.WriteAsync(new HelloReply { Message = "Hello, Baz!" }); } public async Task SayHelloDuplexStreamingAsync(IAsyncStreamReader reader, IServerStreamWriter writer, ServerCallContext context) { while (await reader.MoveNext()) { await writer.WriteAsync(new HelloReply { Message = $"Hello {reader.Current.Name}" }); } } public static void Bind(IServiceBinder binder) { binder (it =>it.SayHelloUnaryAsync(default!,default!), HelloRequest.Parser) .AddClientStreamingMethod .AddServerStreamingMethod(it => it.SayHelloClientStreamingAsync(default!, default!), HelloRequest.Parser) (nameof(SayHelloServerStreamingAsync), it => it.SayHelloServerStreamingAsync, Empty.Parser) .AddDuplexStreamingMethod (nameof(SayHelloDuplexStreamingAsync), it => it.SayHelloDuplexStreamingAsync, HelloRequest.Parser); } }
服务承载程序直接将针对MapGrpcService
using GrpcMini; using Microsoft.AspNetCore.Server.Kestrel.Core; var builder = WebApplication.CreateBuilder(args); builder.WebHost.ConfigureKestrel(kestrel => kestrel.ConfigureEndpointDefaults(options => options.Protocols = HttpProtocols.Http2)); var app = builder.Build(); app.MapGrpcService2(); app.Run();
再次运行我们的程序,客户端依然可以得到相同的输出。
文章来源于互联网:用最少的代码打造一个Mini版的gRPC框架