1 服务化
服务化(SOA: Service-oriented architecture):
- 小而美的独立功能
- 平台/语言无关的协议通信
- 服务抽象, 服务将对外部隐藏逻辑
服务化中非常重要的一点,就是服务之间的访问/调用,通常实现方式:
1.REST(Representational State Transfer)
- 基于Http, 通常使用json
- 三要素: 唯一的资源标识, 简单的方法(此处的方法是个抽象的概念), 一定的表达方式
- 特点: 面向资源
2.RPC(Remote Procedure Call Protocol)
- 像调用本地服务(方法)一样调用服务器的服务(方法).
- RPC 的消息传输可以通过 TCP、UDP 或者 HTTP等.
- 特点: 面向动作
区别: RPC 是以动词(方法)为中心的(queryUser), REST是以名词(资源)为中心的(/user/1/query)
最直观的区别:
.RPC 函数映射协议可以直接用编程语言来书写数据结构和函数定义, 取代编写大量的编码协议格式和分包处理逻辑
2 RPC
2.1 RPC简介
RPC(Remote Procedure Call Protocol) 的主要目标是让构建分布式计算(应用)更容易, 在提供强大的远程调用能力时不损失本地调用的语义简洁性. 为实现该目标, RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用.
我们先看下一个RPC调用的流程涉及到哪些通信细节. 实现 RPC 的程序包括如下 5 个理论模型部分:
- Client(Caller)
- Client
- stubNetwork
- serviceServer
- stubServer(Callee)
一个典型 RPC 调用的流程如下:
详细步骤如下:
(1).服务消费方(client)调用以本地调用方式调用服务; (2).client stub 接收到调用后负责将方法,参数等组装成能够进行网络传输的消息体;
(3).client stub 找到服务地址, 并将消息发送到服务端;
(4).server stub 收到消息后进行解码; (5).server stub 根据解码结果调用本地的服务; (6).本地服务执行并将结果返回给server stub; (7).server stub 将返回结果打包成消息并发送至消费方; (8).client stub 接收到消息, 并进行解码;
(9).服务消费方得到最终结果;
RPC的目标就是要 2~8 这些步骤都封装起来, 让这些细节透明对用户透明。
2.1 典型RPC框架
RPC 框架通常分为两类, 一种是偏重服务治理,另一种侧重跨语言调用
1.侧重重服务治理(Dubbo/Dubbox)主要的精力放在服务发现、路由、容错处理等方面.
代表: Dubbo/Dubbox
特点: 功能丰富,提供高性能的远程调用、服务发现及服务治理能力,适用于大型服务的服务解耦及服务治理,对于特定语言(Java)的项目可以实现透明化接入
2.侧重跨语言调用
通常是基于同一个 IDL(interface define language) 的中间层,可以生成不同语言的代码
代表: gRPC, Thrift
特点: 侧重于服务的跨语言调用,能够支持大部分的语言进行语言无关的调用,非常适合多语言调用场景.
目前市面上比较流行的几个 RPC 开源框架有:
(1).Dubbo, Dubbox (Java, 阿里)
(2).Thrift (跨语言, Facebook)
(3).grpc (跨语言)
(4).Motan (Java, 微博)
(5).rpcx (Go)
(6).Tars (C++, 腾讯)
(7).Finagle(Ruby on Rails, Twitter)
grpc
gRPC是 Google 2015 开源的一个高性能, 跨语言的 RPC 框架, 面向移动和 HTTP/2 设计
2.2 关键因素
我理解一些RPC框架的一些关注因素
1.服务的发现和注册
2.网络协议一般来说您可以选择让您的RPC使用应用层协议,例如HTTP或者HTTP/2协议,或者使用TCP协议,让您的RPC框架工作在传输层。
工作在哪一层网络上会对RPC框架的工作性能产生一定的影响,但是对RPC最终的性能影响并不大
3.Schema和序列化/反序列
- 编码格式: 是 human readable(是否能直观看懂 json)还是 binary(二进制)。
- Schema declaration: 基于 IDL,比如 Protocol Buffers/Thrift,还是自描述的,比如 JSON、XML。另外还需要看是否是强类型的
- 语言平台的中立性: 比如 Java 的 Native Serialization 就只能自己玩,而Protocol Buffers 可以跨各种语言和平台。
- 兼容性: 比如 IDL 加了一个字段,老数据是否还可以反序列化成功。
- 和压缩算法的契合度: 跑 benchmark (基准)和实际应用都会结合各种压缩算法,例如gzip、snappy。
- 性能: 序列化、反序列化的时间,序列化后数据的字节大小
- 序列化方式非常多: Java Native Serialize, XML,JSON,Protocol Buffers,Avro,Thrift,Kyro,Hessian,Protostuff等
4.网络IO模型/线程模型
RPC服务器可以只支持传统的阻塞式同步IO,也可以做一些改进让RPC服务器支持非阻塞式同步IO,或者在服务器上实现对多路IO模型的支持
5.负载均衡策略
6.容错(Fail tolerance/Fault tolerance)
7.监控
3. GRPC
按照上面说到的关注因素意义分析 gRPC 在XXX的实现
3.1 服务的发现和注册
一个完成/成熟的服务治理框架中一般存在四个角色:
1.Consumer(Client) 服务调用方2.Provider(Server) 服务提供方
3.Registry 服务注册与发现中心
4.Monitor 监控中心
服务注册与发现中心主要解决两个问题:服务注册和服务发现. 除此之外,服务注册与发现需要关注监控服务实例运行状态(健康检查)等问题
1.服务注册服务实例将自身服务信息注册到注册中心。
这部分服务信息包括服务所在主机IP和提供服务的Port,以及暴露服务自身状态以及访问协议等信息
2.服务发现服务实例请求注册中心获取所依赖服务信息
服务实例通过注册中心,获取到注册到其中的服务实例的信息,通过这些信息去请求它们提供的服务
3.健康检查
微服务应用中,服务处于动态变化的情况,需要一定机制处理失效的服务实例。一般来讲,服务实例与注册中心在注册后通过心跳的方式维系联系,一旦心跳缺少,对应的服务实例会被注册中心剔除。
gRPC 开源组件官方并未直接提供服务注册与发现的功能实现. 但其设计文档已提供实现的思路,并在不同语言的 gRPC 代码 API 中已提供了命名解析和负载均衡接口供扩展。
XXX的实现:
(1).ZK
(2).xxx
我们这里分析最简单的 ZK 版本.
3.1.1 服务注册
即 Server 节点启动后, 怎么通知大家, 说我也可以给你们提供服务了. 因为我们的 gRPC服务都是使用 RpcServerStarter 启动的, 因此参考 RpcServerStarter 的逻辑
RpcServerStarter.main() --> RpcServerHelper.start() -->RpcServerHelper.start0()--> RpcServerHelper.initNewRegistry() -->RpcServerHelper.newGrpcRegistry()--> ServiceRegistry.newGrpcRegistry() //RpcServerHelper.java private void start0() throws Exception { initNewRegistry(); //初始化 ServiceRegistry, 主要用于多机房注册 startService(); //启动服务, countDownSleep(ofSeconds(REGISTER_AFTER_START), "beforeregister...%s"); register(); //将服务注册到 xxx / ZK 上 ... } private void register() throws IOException { boolean isShuttingDown = registerTermCallback(() -> { ... ThrowableConsumer<Integer, UnknownHostException> registerRunner= w -> { if (autoRegisterToZk) { ... serviceRegistry.registerToZk(w); } ... }; ... } //ServiceRegistry.java public boolean registerToZk(int weight) throws UnknownHostException{ String hostName = getHostName(); InetAddress.getAllByName(hostName); return registerToZk(hostName, weight); } boolean registerToZk(String host, int weight) { ... for (Idc idc : idcs) { //每个 IDC 都注册 String path = makeGrpcZkPath(idc, hostAndPort); for (CuratorFramework zk : zkClientsSupplier.get()) { ... runWithRetry(MAX_RETRY, RETRY_DELAY, () -> setToZk(zk, path, value.getBytes())); ... } } return result; }
简单理解就是, 就是启动服务后, 等待就一段时间之后(3秒), 就会将自己注册到 ZK 上. 之后就可以对外服务了. Client 因为订阅了对应的节点, ZK 上的 server list 变更, 就会通知对应的 Client, Client就拿到了最新的 server list.
3.1.2 服务发现
即一个client请求, client 怎么知道有哪些 server 在提供服务. XXX gRPC 的实现关键代码
Grpc.grpcEx() --> Grpc.getGrpcClient() --> RpcHolder.createGrpcClient()--> RpcHolder.nodes()--> RpcHolder.buildFailoverFromXXXOrZk() -->RpcHolder.buildCachedNode() --> RpcHolder.createNodeCache() //RpcHolder.java static Supplier<Failover<ChannelInfo>> nodes(RpcCacheKey cacheKey) { return () -> { ... return buildFailoverFromXXXOrZk(cacheKey); }; } static Failover<ChannelInfo> buildFailoverFromXXXOrZk(RpcCacheKeycacheKey) { ... if (config.usingXXX() || EnvUtils.belowStaging()) { ... } else { NodeCache node = buildCachedNode(config, basicInfo); ... } } private static NodeCache buildCachedNode(RpcConfig config,GrpcCallBasicInfo basicInfo) { return nodeCache.computeIfAbsent(config, c ->RpcHolder.createNodeCache(c, basicInfo)); } static NodeCache createNodeCache(RpcConfig conf, GrpcCallBasicInfobasicInfo) { Function<List<? extends ChannelInfo>, Failover<ChannelInfo>>failoverSupplier = failoverFactory( conf); ZkBasedNodeResource<Failover<ChannelInfo>> oldNode =ZkBasedNodeResource .<Failover<ChannelInfo>> newGenericBuilder() .withCacheFactory(makePath(NAME_SPACE_GRPC,oldPath(conf)), ZkClientHolder::get) // .withStringFactoryEx(raw -> initObject(conf, basicInfo,raw, failoverSupplier)) // .withWaitStopPeriod(STOP_WAIT_PERIOD) // .withCleanupConsumer(RpcHolder::tryClose) // .asyncRefresh(forAsyncRefresh("grpc-old", conf)) //.addFactoryFailedListener(KsZkUtils::perfZkNodeFactoryFail) // .build(); ZkBasedTreeNodeResource<Failover<ChannelInfo>> newNode =ZkBasedTreeNodeResource .<Failover<ChannelInfo>> newBuilder() // .path(makeLocalZkPath(NAME_SPACE_GRPC, newPath(conf)))// .curator(conf.newZkFactory().apply(localIdc())) // 使用本地zk集群 .factoryEx(raw -> initObjectNew(conf, basicInfo, raw,failoverSupplier)) // .withWaitStopPeriod(STOP_WAIT_PERIOD) // .cleanup(RpcHolder::tryClose) // .build(); return new NodeCache(newNode::get, oldNode::get);
}
即最终的节点数据来源于 ZK(使用的 ZkBasedNodeResource). 即 gRPC 请求的首先需要一个 Channel(服务通道), 而 Channel 来源于 NodeCache. NodeCache 包装了 ZKConsumer. ZK Consumer 订阅了 gRPC 的节点, 因此节点有变更(比如有新的 gRPCserver 注册了)会收到通知, 更新本地缓存.
3.1.3 心跳检查
心跳解决的是: 注册中心怎么知道 server 是不是还活着.
前面讲到服务注册的时候, 有讲到服务启动之后, 会把 RPC 注册到 ZK 上. 我们的服务gRPC 服务的路径是:
config.zk.cluster.XX / XXX / grpc / adminFeedbackRpcService / XX /
这里我们再详细分析一下注册的过程:
/XXX.XXX.XX.XXX.ServiceRegistry.java
boolean registerToZk(String host, int weight) { ... for (Idc idc : idcs) { String path = makeGrpcZkPath(idc, hostAndPort); for (CuratorFramework zk : zkClientsSupplier.get()) { if (zkRegistryEphemeral) { synchronized (ephemeralNodes) { //ZK临时节点 try { EphemeralNode node = createEphemeralNode(zk,path, value.getBytes()); ephemeralNodes.put(tuple(zk, path), node); } ... } } else { //ZK永久节点 try { runWithRetry(MAX_RETRY, RETRY_DELAY, () -> setToZk(zk, path, value.getBytes())); } catch (Throwable e) { result = false; logger.error("", e); } } } } return result;}
public static EphemeralNode createEphemeralNode(CuratorFramework client,String path, byte[] value) throws NodeExistsException { ... return new KeepEphemeralListener(client, path, value);}KeepEphemeralListener(CuratorFramework originalClient, String path,byte[] value) throws NodeExistsException { ... //EPHEMERAL - 临时节点originalClient.create().creatingParentsIfNeeded().withMode(EPHEMERAL).forPath(path, value); ...}public static void setToZk(CuratorFramework client, String path, byte[]value) { //PERSISTENT - 永久节点 setToZk(client, path, value, PERSISTENT);}
那怎么知道我们的 gRPC服务是创建临时节点还是永久节点? 对应的参数可以在RpcServerStarter 中看到. 有两种情况下会是临时节点:
1.RpcServerStarter 启动使用 -p 参数指定了端口2.RpcServerStarter 启动使用 force-zk-ephemeral 参数
相关代码如下:
boolean ephemeral = false; //是否使用临时 ZK 节点int port;if (commandLine.hasOption(PARAM_OVERRIDE_PORT)) { ... ephemeral = true;}
...if (commandLine.hasOption(PARAM_FORCE_ZK_EPHEMERAL)) { ephemeral = true;}
注意: 我们现在的服务都是部署在 xxx 上, 都会使用随机端口.
就是说 XXX 会使用 -p 指定端口(随机分配), 这就会导致我们去创建 ZK 节点的时候会是ZK临时节点. ZK 临时节点的特性就是
These znodes exists as long as the session that created the znode isactive. When the session ends the znode is deleted.
所以我们的 gRPC 正式利用了 ZK 临时节点的特性. 当服务下线时候, 临时节点会被删除,对应的 client 会收到变更通知
3.2 网络协议
RPC 框架一个重要的环节就是 Client 和 Server 怎么通信, 采用什么通信协议. gRPC 使用的 http2 协议.
HTTP 协议可以算是现阶段 Web 上面最通用的协议了,在之前很长一段时间,很多应用都是基于 HTTP/1.x 协议. HTTP/1.x 协议是一个文本协议,可读性非常好,但效率其实并不高
HTTP/2标准于2015年5月以RFC 7540正式发表, 前身是 Google 推出的 SPDY 协议. 在Chrome 上输入 chrome://net-internals/#http2, 就可以浏览器上捕获所以的 Http2 的请求.
有人在 gRPC 的 issues 里面为 gRPC 为什么选择 HTTP/2 而不是 TCP, 下面作者的回答
HTTP2 is used for many good reasons: HTTP2 is a standard and HTTP protocol is well known to proxies, firewalls and manysoftware tools. The streaming nature of HTTP2 suits our needs very well, so no needto reinvent the wheel.
gRPC 为什么会选择 HTTP2 协议:
1.HTTP/2 是一个公开的、经过实践检验的标准 HTTP/2 天然支持物联网、手机、浏览器
2.基于HTTP/2 多语言客户端实现容易天然跨语言
3.HTTP/2支持Stream和流控在业界,有其他支持stream的方案,比如基于websocket, 但是 HTTP/2 显然是一个更好的选择
4.基于HTTP/2 在Gateway/Proxy很容易支持 nginx对gRPC的支持: https://www.nginx.com/blog/nginx-1-13-10-grpc/
5.HTTP/2 安全性有保证 HTTP/2 天然支持SSL
gRPC选择基于HTTP/2,那么它的性能肯定不会是最顶尖的(比如相比基于TCP的协议肯定)。但是对于rpc来说中庸的qps可以接受,通用和兼容性才是最重要的事情
个人觉得网络协议的不同是 gRPC 存在的最重要的原因: 在当前容器化/服务化的浪潮下,开放互通的需求必然会产生基于HTTP/2的RPC。即使没有gRPC,也会有其它基于HTTP/2的RPC
相比于其他协议:
1.私有协议, 比如 Dubbo 协议
HTTP/2有更好的适用性, 也经过了更广泛的验证和支持.
2.TCP协议
有完善的 SSL/TSL 层.
3.3 Schema和序列化/反序列
前面讲到, gRPC 是一种典型的跨语言的 RPC 框架. 跨语言的 RPC 框架的实现, 基本都是通过定于一种接口描述语言(interface define language)来实现的.
1.Thrift 使用 Thrift IDL file(.thrift文件)
2.gRPC 使用 protocol buffer(.proto文件), 简称 protobuf
gRPC 就是使用 protobuf 文件来做 Schema 的. 通过 protobuf 文件, 就知道通信双方的数据格式.
protobuf 官方定义: 针对结构化数据的一种语言无关, 平台无关, 可扩展的序列化机制
其中很重要的一个点: 结构化数据, 从 protobuf 的使用可以体会到这一点. 因为使用protobuf 的第一步就是定义一个 .proto文件来描述我们的数据. proto 文件本身就是一个Schema, 从这个文件我们就知道双方通信数据的格式.
注意, protobuf 本身不是和 gRPC 绑定的, 只是 gRPC 使用了 protobuf 的特性而已. gRPC为什么选择 protobuf, 首先 grpc 有跨语言的要求, 事实上的跨语言序列化方案主流的选择并不多: protobuf, thrift, json 其中 json 一个很大的问题是缺少类型信息, 不支持自引用(也是因为缺少类型信息导致). 而 protobuf 是自家的, 没道理不用
3.3.1 序列化里的类型信息
序列化是将对象信息转换为可以存储或传输的形式的过程. 各种序列化库层出不穷,其中有一个重要的区别:类型信息存放在哪?
可以分为三种:
1.不保存类型信息
典型的是各种json序列化库, 优点是灵活,缺点是使用的双方都要知道类型是什么.
2.类型信息保存到序列化结果里
比如java自带的序列化,hessian等. 带来的问题就是是类型信息冗余. 比如 RPC 里每一个request 都要带上类型.
因此有一种常见的RPC优化手段就是两端协商之后,后续的请求不需要再带上类型信息.
3.在生成代码里带上类型信息
通常是在 IDL 文件里写好package和类名,生成的代码直接就有了类型信息.比如protobuf, thrift. 缺点是需要生成代码,双方都要知道IDL文件.
3.4 网络IO模型/线程模型
3.4.1 Client 端 RPC 调用
gRPC 的通信协议基于标准的 HTTP/2 设计. 主要提供了两种 RPC 调用方式:
1.普通 RPC 调用方式,即请求-响应模式。
2.基于 HTTP/2.0 的 streaming 调用方式。
XXX目前基本上都还是普通的 RPC 调用方式(也就是说我们还没有享受 HTTP/2.0 的特性)
普通的 RPC 调用提供了三种实现方式:
(1).xxxBlockingStub, 比如 CustomerServiceFeedbackBlockingStub同步阻塞式服务调用, 在当前线程返回结果. HelloServiceGrpc.HelloServiceBlockingStub stub =HelloServiceGrpc.newBlockingStub(channel); stub.simpleHello(person).getString();(2).xxxFutureStub, 比如 CustomerServiceFeedbackFutureStub同步非阻塞调用,基于 Future 机制 HelloServiceGrpc.HelloServiceFutureStub stub =HelloServiceGrpc.newFutureStub(channel); ListenableFuture<ProtoObj.Result> response =stub.simpleHello(person);(3).xxxStub, 比如 CustomerServiceFeedbackStub异步非阻塞调用 HelloServiceGrpc.HelloServiceStub stub =HelloServiceGrpc.newStub(channel); StreamObserver<ProtoObj.Result> responseObserver = newStreamObserver<ProtoObj.Result>() { @Override public void onNext(ProtoObj.Result value) { System.out.println(value.getString()); latch.countDown(); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { } }; stub.simpleHello(person, responseObserver);
我们是怎么选择的呢? 看一下我们最常用的调用, 基本上都是选择的基于 Future 机制的同步非阻塞模式.
returnGrpc.grpcEx(CommonRpcServiceGrpc.CommonRpcServiceFutureClient.class,DpRpcConfigRouter.getRpcConfig(dataSource)).getByStringKey(request);
也就是说, gRPC 调用方在当前线程触发请求, 得到 Future, 再从 Future 获取结果.
关于 gRPC 底层的调用是怎么实现的, 可以看看grpc-java 的实现, 简单来说还是基于Netty4.1 的 HTTP/2 协议栈框架构建(netty-codec-htt2).
其中关键的几个类 ClientCalls.java, NettyClientTransport, NettyClientHandler.
在服务端, Server 端就是经典的 Reactor 模式, 主线程监听指定的 port,来等待Client连接请求, 分给 worker 线程池处理. HTTP/2请求消息的请求和响应发送都由Netty负责(NioEventLoop),gRPC 负责消息的序列化和反序列化、以及应用服务接口的调用。
3.5 负载均衡策略
从一堆服务提供者(Provicer)中选择一个为当前请求服务的策略.
负载均衡实现方式:
1.Proxy Model存在一个 Load Balancer (LB) proxy 的中间层, 它会追踪Provider的状态.客户端不用只需要知道 Load Balancer (LB) proxy.通常是基于软件如 LVS,HAproxy等实现. LB一般具备健康检查能力,能自动摘除不健康的服务实例.
2.Client客户端知道每个Provicer, 每次服务客户端负载从中选择一个为本次请求服务.通常是基于注册中心实现. 服务提供方启动时,首先将服务地址注册到服务注册表,同时定期报心跳到服务注册表以表明服务的存活状态,相当于健康检查.服务消费方要访问某个服务时,它通过内置的LB组件向服务注册表查询,同时缓存并定期刷新目标服务地址列表,然后以某种负载均衡策略选择一个目标服务地址,最后向目标服务发起请求。
gRPC开源组件官方并未直接提供服务注册与发现的功能实现,但其设计文档已提供实现的思路,并在不同语言的gRPC代码API中已提供了命名解析和负载均衡接口供扩展。
其基本实现原理:
1.服务启动后gRPC客户端向命名服务器发出名称解析请求,名称将解析为一个或多个IP地址,每个IP地址标示它是服务器地址还是负载均衡器地址,以及标示要使用那个客户端负载均衡策略或服务配置。
2.客户端实例化负载均衡策略,如果解析返回的地址是负载均衡器地址,则客户端将使用grpclb策略,否则客户端使用服务配置请求的负载均衡策略。
3.负载均衡策略为每个服务器地址创建一个子通道(channel)。
4.当有rpc请求时,负载均衡策略决定那个子通道即grpc服务器将接收请求,当可用服务器为空时客户端的请求将被阻塞。
根据gRPC官方提供的设计思路,基于进程内LB方案(即第2个案,阿里开源的服务框架Dubbo 也是采用类似机制)结合分布式一致的组件(如Zookeeper、Consul、Etcd),可找到gRPC服务发现和负载均衡的可行解决方案
XXX实现的是基于 ZK 的客户端负载均衡, 核心源码参见:
Grpc.grpcEx() --> GrpcClient.grpc() --> GrpcCore.getClient() -->RpcClientProxy //GrpcCore.java private final class RpcClientProxy implements InvocationHandler { private ChannelInfo getChannel(Failover<ChannelInfo> failover) { ... ChannelInfo channel = locateStrategy.apply(failover); ... return channel; } }
其中的 locateStrategy 就包含负载均衡的逻辑, 默认的是 WeightFailover 的getOneAvailable() 方法:
//GrpcClient.java public <C> C grpc(Class<C> iface, Duration deadlineAfter) { return holder.getClient(iface, Failover::getOneAvailable, null,deadlineAfter); } //WeightFailover.java @Override public T getOneAvailable() { // TODO better using a snapshot current Weight<T> or a newstateful Weight<T> List<T> available = getAvailable(1); return available.isEmpty() ? null : available.get(0); } @Override public List<T> getAvailable(int n) { return getAvailable(n, emptySet()); } //根据节点权重选择 private List<T> getAvailable(int n, Collection<T> exclusions) { int sum = 0; //总权重 for (Entry<T, Integer> entry : currentWeightMap.entrySet()) { ... sum += thisWeight; } List<T> result = new ArrayList<>(); if (sum > 0) { int size = snapshot.size(); for (int i = 0; i < size; i++) { if (sum == 0) { break; } if (result.size() == n) { break; } //主要的逻辑还是随机, 随机取 [0, sum-1] 之间的一个数 int left = ThreadLocalRandom.current().nextInt(sum); Iterator<TwoTuple<T, Integer>> iterator =snapshot.iterator(); while (iterator.hasNext()) { TwoTuple<T, Integer> candidate = iterator.next(); int entryWeight = candidate.getSecond(); if (left < entryWeight) { T obj = candidate.getFirst(); if (!exclusions.contains(obj) &&filter.test(obj)) { result.add(obj); } if (result.size() == n) { break; } iterator.remove(); sum -= entryWeight; break;
} left -= entryWeight; } } } return result; }
简单来说, 我们默认的策略就是从所有服务 Provider 中随机选择一个. 只是带上权重信息了, 每个节点有初始权重(100), 每次失败会降低权重, 每次成功会增加权重
3.7 容错(Fault tolerance)
Fault tolerance 解决的问题是, 当 client 某一次请求 server 失败的时候, 如何处理本次请求.
常见的 Failover 策略:
(1).Failover失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。(2).Failfast快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
(3).Failsafe失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
(4).Failback失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
(5).Forking并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。
(6).Broadcast广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。
XXX的gRPC 实现, 没有 Failover 策略, 失败了就失败了(框架层什么也不错). 但是我们是server 节点都带权重, 每次失败, 会降低改节点的权重, 减少后续请求打到该节点的几率。
重试策略完全依赖 client 自己做。
3.8 监控
GrpcCore 中维护了一堆 GrpcCallListener. GrpcCallListener 定义如下
public interface GrpcCallListener<T> { /** * will invoked when there is no available node. */ default void noAvailableNode() throws Exception { } /** * will invoked on caller's thread. */ @Nullable default T before(ChannelInfo channel, Method method) throwsException { return null; } void onSuccess(@Nullable T before, ChannelInfo channel, Methodmethod, long costInMs) throws Exception; /** * @param t possible instance: * {@link java.util.concurrent.TimeoutException} fromcaller thread. * {@link io.grpc.StatusRuntimeException} from gRPC workerthread or caller thread. */ void onError(@Nullable T before, ChannelInfo channel, Method method,long costInMs, @Nonnull Throwable t) throws Exception; void onCancel(@Nullable T before, ChannelInfo channel, Methodmethod, long costInMs, @Nonnull CancelType cancelType) throws Exception;}
这些 GrpcCallListener 是怎么起作用的呢?
GrpcCallListener 定义了再方法调用前后(成功/失败/取消/超时)的回调行为. 有了这个机制, 我们就可以做很多事情. 我们的 RPCMonitor 就是通过 GrpcCallListener 机制实现的. 每个 GrpcClient 默认都会带上 RPCMonitor 相关的 GrpcCallListener 来打到收集调用信息的目的