l 前言
本文记录了我的一次.net core 微服务架构实践经验,以及所用到的技术
l 优点
每个服务聚焦于一块业务,无论在开发阶段或是部署阶段都是独立的,更适合被各个小团队开发维护,团队对服务的整个生命周期负责,工作在独立的上下文之中。
如果某一项服务的性能达到瓶颈,我们只需要增加该服务负载节点,能够针对系统的瓶颈服务更有效的使用资源。
服务A可以使用.net实现 ,服务B可以使用java实现,技术选型灵活,系统不会长期限制在某个技术栈上。
松耦合、高内聚,代码容易理解,开发效率高,更好维护。
高可用,每个服务可以启动多个实例负载,单个实例挂了有足够的响应时间来修复
l 缺点
系统规模庞大,运维要求高,需要devops技巧(Jenkins,Kubernetes等等)
跨服务需求需要团队之间的协作
跨服务的调用(http/rpc)增加了系统的延迟
l Docker
docker是目前普遍使用的容器化技术,在此架构中我们的应用程序将部署在docker容器里面,通过docker发布应用 需要先编写一个dockerfile,如下
#引入镜像 .net core 3.1
FROM mcr.microsoft.com/dotnet/core/aspnet:3.1-buster-slim AS base
#设定工作目录
WORKDIR /app
#在容器中程序使用的端口,一定要和程序启动使用的端口对应上
EXPOSE 80
#复制文件到工作目录
COPY . .
#环境变量 此变量会覆盖appsetting.json 内的同名变量
ENV Ip ""
ENV Port ""
#启动程序
ENTRYPOINT ["dotnet", "Union.UserCenter.dll"]
docker build 命令 将我们的发布目录打包一个docker镜像,例如 docker build -t test . ,test是镜像名称
docker run 命令启动我们打包的镜像,例如 docker run -d -p 5002:80 --name="test1" -e Ip="192.168.0.164" -e Port="5002" test ,-e 表示传递环境变量
更多docker命令 请查阅:https://www.runoob.com/docker/docker-command-manual.html
docker官网:https://www.docker.com
部署方便:只需要一个简单的 docker run命令,就可以启动一个应用实例了
部署安全:打包镜像的时候已经打包了应用所需环境,运行环境不会出现任何问题
隔离性好:同一台机器我可以部署java的应用和.net的应用,互不影响
快速回滚:只要镜像存在可以快速回滚到任一版本
成本低:一台机器可以运行很多实例,很容易就可以实现高可用和横向扩展
经测试docker for windows不适合部署生产环境,还是得在liunx系统上跑, .net framework 无法在docker上部署
Docker compose :Docker官方提供的管理工具,可以简单的配置一组容器启动参数、启动顺序、依赖关系
Kubernetes :容器数量很多之后会变得难以管理,可以引入Kubernetes对容器进行自动管理,熟练运用有一定难度,中文社区:https://www.kubernetes.org.cn/k8s
l RPC 远程过程调用
为什么要有RPC
按照微服务设计思想,服务A只专注于服务A的业务,但是需求上肯定会有服务A需要调用服务B来完成一个业务处理的情况,使用http调用其他服务效率相对较低,所以引入了RPC。
gRPC vs thrift 评测:https://www.cnblogs.com/softidea/p/7232035.html
这里使用thrift,thrift 官网:http://thrift.apache.org
Thrift 采用IDL(Interface Definition Language)来定义通用的服务接口,然后通过Thrift提供的编译器,可以将服务接口编译成不同语言编写的代码,通过这个方式来实现跨语言的功能,语法请自行百度
下载thrift 代码生成器 http://thrift.apache.org/download ,thrift-0.13.0.exe 这个文件
执行命令 thrift.exe --gen netcore xxxxxxx.thrift ,生成C# 服务接口代码
引用官方提供的.net 库,可以去官网下载,找不到的可以直接 nuget引用 Examda.Thrift,这是我为了方便使用上传的
添加生成的代码到我们的服务端里,然后自己实现 thrift文件定义的接口
using System.Threading;
using System.Threading.Tasks;
using Union.UnionInfo.Service.Interface;
using static Examda.Contract.UnionInfo.UnionInfoService;namespace Union.UnionInfo.Service
{public class UnionInfoServiceImpl : IAsync{private readonly ILmMembersInfoService _lmMembersInfoService;public UnionInfoServiceImpl(ILmMembersInfoService lmMembersInfoService){_lmMembersInfoService = lmMembersInfoService;}//实现接口public async Task<string> GetUnionIdAsync(string DozDomain, CancellationToken cancellationToken){return (await _lmMembersInfoService.GetMembersInfoByDozDomain(DozDomain)).UnionId;}}
}
添加一个类继承 IHostedService
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
using Thrift;
using Thrift.Protocols;
using Thrift.Server;
using Thrift.Transports;
using Thrift.Transports.Server;namespace Examda.Core.Rpc
{public class RpcServiceHost : IHostedService{public IConfiguration Configuration { get; }public ITAsyncProcessor Processor { get; }public ILoggerFactory LoggerFactory { get; }public RpcServiceHost(IConfiguration configuration, ITAsyncProcessor processor,ILoggerFactory loggerFactory){Configuration = configuration;Processor = processor;LoggerFactory = loggerFactory;}//public virtual Task StartAsync(CancellationToken cancellationToken){TServerTransport serverTransport = new TServerSocketTransport(Configuration.GetValue<int>("RpcPort"));TBinaryProtocol.Factory factory1 = new TBinaryProtocol.Factory();TBinaryProtocol.Factory factory2 = new TBinaryProtocol.Factory();//UnionInfoService.AsyncProcessor processor = new AsyncProcessor(new UnionInfoServiceImpl());实现的服务这里采用.net core 自带 DI注入,也可以直接实例化TBaseServer server = new AsyncBaseServer(Processor, serverTransport, factory1, factory2, LoggerFactory);return server.ServeAsync(cancellationToken);}public virtual Task StopAsync(CancellationToken cancellationToken){return Task.CompletedTask;}}
}
修改ConfigureServices添加如下代码
//注入rpc服务实现实例services.AddSingleton<ITAsyncProcessor>(provider =>{var lmMembersInfoService = provider.GetService<ILmMembersInfoService>();return new AsyncProcessor(new UnionInfoServiceImpl(lmMembersInfoService));});//监听rpc端口services.AddHostedService<RpcServiceHost>();
服务端就完成了,接下来编写客户端调用,修改客户端ConfigureServices添加如下代码
//test rpc服务services.AddScoped(provider =>{var examdaConsul = provider.GetService<ExamdaConsul>();Address address = examdaConsul.GetAddress("UnionInfo");//获取服务地址,这里我封装了,测试可以先直接写死var tClientTransport = new TSocketClientTransport(IPAddress.Parse(address.Ip), address.Port);var tProtocol = new TBinaryProtocol(tClientTransport);return new UnionInfoService.Client(tProtocol);});
控制器内调用示例
using System.Threading;
using System.Threading.Tasks;
using Examda.Contract.UnionInfo;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;namespace RPCCLIENT.Controllers
{[ApiController][Route("[controller]")]public class WeatherForecastController : ControllerBase{private static readonly string[] Summaries = new[]{"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"};private readonly UnionInfoService.Client _rpcClient;private readonly ILogger<WeatherForecastController> _logger;public WeatherForecastController(ILogger<WeatherForecastController> logger, UnionInfoService.Client rpcClient){_logger = logger;_rpcClient = rpcClient;}[HttpGet]public async Task<IActionResult> Get(){await _rpcClient.OpenTransportAsync(CancellationToken.None);var order = await _rpcClient.GetUnionIdAsync("wx.hdgk.cn", CancellationToken.None);//rpc调用return Ok(order);}}
}
l 服务注册与发现
为什么要有服务注册与发现
例如:服务A一开始只有一个实例,此时又启动了一个服务A的实例,但是调用服务A的服务B并不知道 服务A多了一个实例(或者少了),此时引入服务注册与发现可以让服务B得知服务A的变更情况,服务B就知道自己要调用的服务IP:端口 是多少,不需要人工干预
常见的注册中心
这里使用consul
健康检查:consul自带健康检查,检查服务是否可用,不可用的服务将从注册中心剔除,自带的就是隔一段时间检测一下端口通不通,并且支持自行扩展健康检查,可用自己在服务内实现是否健康的逻辑,比如虽然接口是通的,但是我发现自己宿主机cpu过80%了,就返回不健康的状态
服务注册:nuget安装consul,写一个扩展方法
/// <summary>/// 如果服务同时包含http,rpc调用此方法/// </summary>/// <param name="services"></param>/// <param name="Configuration"></param>/// <param name="ServiceName"></param>/// <param name="Remark"></param>public static void AddExamdaServiceRpc(this IServiceCollection services, IConfiguration Configuration, string ServiceName, string Remark){var Ip = Configuration.GetValue<string>("Ip");var RpcPort = Configuration.GetValue<int>("RpcPort");var RpcAddress = $"{Ip}:{RpcPort}";var consulClient = new ConsulClient(x => x.Address = new Uri(Configuration.GetValue<string>("ConsulUrl")));//请求注册的 Consul 地址var httpCheck = new AgentServiceCheck(){DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服务启动多久后注册Interval = TimeSpan.FromSeconds(20),//健康检查时间间隔,或者称为心跳间隔Timeout = TimeSpan.FromSeconds(5),TCP = RpcAddress};var registration = new AgentServiceRegistration(){Checks = new[] { httpCheck },ID = RpcAddress,Name = ServiceName,Address = Ip,Port = RpcPort,Tags = new[] { Remark }};consulClient.Agent.ServiceRegister(registration).Wait();//应用程序退出时AppDomain.CurrentDomain.ProcessExit += (sender, e) =>{consulClient.Agent.ServiceDeregister(registration.ID).Wait();//consul取消注册服务};}
修改ConfigureServices添加如下代码,启动
services.AddExamdaServiceRpc(Configuration, "UnionInfo", "联盟机构信息服务");
安装consul请自行百度
服务发现与变更:调用方配置好自己需要调用的服务名称集合,然后去consul获取地址列表,然后根据需要调用的服务数量启动N个线程来轮询服务最新的地址信息,不用担心轮询造成的消耗过大,因为consul提供了Blocking Queries 阻塞查询的方式,请求发送到consul之后会在consul阻塞(30)秒,期间有变更或者到达30秒了之后才会返回地址列表,然后每一次变更之后的地址列表都会有一个新的版本号。
using Consul;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace Examda.Core.Consul
{public class Address{public string Ip { get; set; }public int Port { get; set; }}/// <summary>/// 未实现服务负载均衡,这里随机选一个/// </summary>public class ExamdaConsul{private object locker = new object();private readonly ConsulClient _consulClient;private IDictionary<string, List<Address>> RpcServices { get; set; }public ExamdaConsul(IConfiguration configuration){RpcServices = new Dictionary<string, List<Address>>();_consulClient = new ConsulClient(c =>{c.Address = new Uri(configuration.GetValue<string>("ConsulUrl"));});foreach (var item in configuration.GetSection("RpcServiceClient").GetChildren().Select(x => x.Value).ToList())//遍历所需要调用的服务名称集合{RpcServices.Add(item, null);var res = _consulClient.Catalog.Service(item).Result;RpcServices[item] = res.Response.Select(x => new Address() { Ip = x.ServiceAddress, Port = x.ServicePort }).ToList();Task.Factory.StartNew(() =>{var queryOptions = new QueryOptions { WaitTime = TimeSpan.FromSeconds(30) };//阻塞时间queryOptions.WaitIndex = res.LastIndex;while (true){GetAgentServices(queryOptions, item);}});}}private void GetAgentServices(QueryOptions queryOptions, string serviceName){var res = _consulClient.Catalog.Service(serviceName, null, queryOptions).Result;if (queryOptions.WaitIndex != res.LastIndex){lock (locker){queryOptions.WaitIndex = res.LastIndex;var currentServices = RpcServices[serviceName];RpcServices[serviceName] = res.Response.Select(x => new Address() { Ip = x.ServiceAddress, Port = x.ServicePort }).ToList();}}}/// <summary>/// 获取服务可用地址/// </summary>/// <param name="serviceName"></param>/// <returns></returns>public Address GetAddress(string serviceName){for (int i = 0; i < 3; i++){Random r = new Random();int index = r.Next(RpcServices.Count);try{return RpcServices[serviceName][index];}catch{Thread.Sleep(10);continue;}}return null;}}
}
然后注入一个ExamdaConsul类的单例,讲写死的服务地址改成从consul获取
//注入consul客户端 单例services.AddSingleton<ExamdaConsul>();//注入UnionInfo rpc客户端 线程单例services.AddScoped(provider =>{var examdaConsul = provider.GetService<ExamdaConsul>();Address address = examdaConsul.GetAddress("UnionInfo");//从consul获取服务地址var tClientTransport = new TSocketClientTransport(IPAddress.Parse(address.Ip), address.Port);var tProtocol = new TBinaryProtocol(tClientTransport);return new UnionInfoService.Client(tProtocol);});
consul 官网:https://www.consul.io
l API网关
所有的请求都先经过网关,由转发到对应的服务,对比了 ocelot 和 Bumblebee 两个c#写的网关。选择使用了Bumblebee。
Ocelot性能比较低,吞吐比直接访问降低四倍,但是文档很全面,功能集成很多,不需要自己扩展什么。
Bumblebee 我做测试发现Bumblebee 性能很优秀,尴尬的是这个几乎没什么人用,很多功能需要自己扩展,作者官网http://beetlex.io/ Bumblebee 文档:http://doc.beetlex.io/#29322e3796694434894fc2e6e8747626
这里使用Bumblebee ,使用方法可以看作者的文档
健康检查:不健康的节点将不会被转发请求
限流:例如限制某个节点最多300rps,如果此节点并发了1000个请求,大概会有700个左右请求网关会直接返回错误,不会转发到具体的服务,可以起到挡洪作用,避免节点直接挂了。
路由:我是这么设置的 例如 http://192.168.0.164/Course/Tool/GetUserInfo ,Course一级是服务名称 tool 是服务的控制器名称 getuserinfo是方法名称
负载均衡:服务多个节点负载,网关可以设置负载均衡策略
注册到网关:redis发布订阅实现,添加一个扩展方法
public static void AddExamdaService(this IServiceCollection services, IConfiguration Configuration, string ServiceName, string Remark){var Ip = Configuration.GetValue<string>("Ip");var Port = Configuration.GetValue<int>("Port");var Address = $"http://{Ip}:{Port}";services.AddSingleton(new Redis(Configuration.GetValue<string>("Redis")));ServiceProvider serviceProvider = services.BuildServiceProvider();Redis redis = serviceProvider.GetService<Redis>();redis.Publish("ApiGetewap", JsonConvert.SerializeObject(new { Address, ServiceName, Remark }));AppDomain.CurrentDomain.ProcessExit += (sender, e) =>{redis.Publish("ApiGetewapExit", JsonConvert.SerializeObject(new { Address, ServiceName, Remark }));};}
网关订阅这个频道
g = new OverrideApiGetewap();g.HttpOptions(o =>{o.Port = 80;o.LogToConsole = true;o.LogLevel = BeetleX.EventArgs.LogType.Error;});g.Open();var sub = Program.redis.GetSubscriber();//注册服务sub.Subscribe("ApiGetewap",(chanel,message)=> {var service = JsonConvert.DeserializeObject<Service>(message);var route = g.Routes.NewOrGet(string.Format("^/{0}.*", service.ServiceName), service.Remark);route.AddServer(service.Address, 0);});//服务退出sub.Subscribe("ApiGetewapExit", (chanel, message) => {var service = JsonConvert.DeserializeObject<Service>(message);var route = g.Routes.NewOrGet(string.Format("^/{0}.*", service.ServiceName), service.Remark);route.RemoveServer(service.Address);});
修改ConfigureServices添加如下代码,启动。这样网关也能动态的发现我们的服务了
//注册此服务到网关services.AddExamdaService(Configuration, "Course", "联盟我的课程服务");
异常流量拉黑:例如某个ip 10s内请求数量超过300 将他拉黑 30 分钟,这里使用redis实现计数器
自己写的简陋版本
//请求完成触发的事件,不会阻塞请求g.RequestIncrement += (sender, e) =>{Task.Factory.StartNew(() =>{var db = Program.redis.GetDatabase();var counter = db.KeyExists(e.Request.RemoteIPAddress);//判断该ip是否存在计数器if (counter){var count = db.StringIncrement(e.Request.RemoteIPAddress);//计数器加1if (count > 300){db.StringSet("BlackList_" + e.Request.RemoteIPAddress, "", new TimeSpan(0, 1, 0), flags: StackExchange.Redis.CommandFlags.FireAndForget);//拉黑半个小时,不等待返回值}}else{db.StringIncrement(e.Request.RemoteIPAddress, flags: StackExchange.Redis.CommandFlags.FireAndForget);//创建计数器db.KeyExpire(e.Request.RemoteIPAddress, new TimeSpan(0, 0, 10), flags: StackExchange.Redis.CommandFlags.FireAndForget);//设置10s过期}});};
class OverrideApiGetewap : Bumblebee.Gateway{//请求管道的第一个事件protected override void OnHttpRequest(object sender, EventHttpRequestArgs e){if (!e.Request.Path.Contains("/__system/bumblebee") && e.Request.Path != "/")//排除掉访问网关ui的{var db = Program.redis.GetDatabase();var isBlack = db.KeyExists("BlackList_" + e.Request.RemoteIPAddress);if (isBlack){e.Response.Result(new JsonResult("你被拉黑了"));e.Cancel = true;//取消请求}else{base.OnHttpRequest(sender, e);}//base.OnHttpRequest(sender, e);}else{base.OnHttpRequest(sender, e);}}}
熔断器:当某个请求转发下游服务返回错误次数或者超时次数达到阀值时自动熔断该节点,暂未实现
接口验签:客户端请求都带上用 url时间戳 参数加密的签名,网关进行验证,确保是合法的客户端
网关自带UI
l 链路追踪 性能监控
Skywalking 官网:http://skywalking.apache.org/
每个请求的链路,每一个步骤的耗时都可以查到,如下图的一个请求执行了很多次sql,每个步骤的sql语句都可以看到,集成很简单,使用官方提供的.net探针集成到各个服务就好了,无代码入侵。
有一个很强大的ui界面,也可以提供报警等功能,ui可以查看到响应很慢的接口,平均响应时间,以及每个服务的关联关系,但是有个问题我没有解决,RPC链路追踪不到。
可以自行去官方查阅使用文档
l 分布式日志收集框架
实例太多了,不可能使用单机日志,需要一个分布式日志收集框架把所有日志收集到一起,可以考虑使用java的elk 或者 .net core 的Exceptionless
l 分布式事务
跨服务之间调用并且涉及到事务的处理方式,还在想怎么弄
l 配置中心
各个实例逐个配置太麻烦了,特别是如果更改了数据库地址,每一个服务的所有实例都要改,改死去,并且重启实例也不现实,一定要支持配置热更新,试了下携程的Apollo有点消耗资源
l CI/CD
将源码管理做一个开发分支,一个测试分支,一个发布分支,开发只动开发分支,开发完成后提交代码,由测试合并到测试分支,并通知Jenkins生成镜像并发布到测试站点,测试通过之后由运维合并到发布分支,或手动或自动通过Jenkins发布,应该保证 测试分支与发布分支的版本能对应docker镜像仓库的每一个版本,个人见解。
l 例:XXXX服务的项目源码结构
记录与分享自己的一次微服务实践,以上均为个人见解,不对的地方或者好的建议欢迎来信 289501868@qq.com