在上一篇文章中讲到了provider中针对consumer的请求消息封装MessageTask
的部分细节。留下了最核心的处理消息的细节process
方法没有说。
在继续对所谓的核心逻辑梳理之前,先整体的捋一捋之前文章的行文思路。
consumer的client发送一个请求报文,这个报文包含header头以及请求体,通过provider的解码器将其进行解码,解码后的对象为JRequestPayload
或者JResponsePayload
。这些解码后的对象还不属于业务层的,仅仅只是对数据包进行了第一层的封装。解码完成后,经过第一层包装的消息进入具体处理消息的handler,而这个handler并不允许处理太多业务逻辑,因为这是IO线程,处理多了会累死,消耗性能,影响别的请求。handler将消息丢给了processor来处理。然而processor是一个任劳任怨的老大哥,他也很聪明,叫来了CloseableExecutor
大佬,也就是Executor
家族的一员来帮忙处理这些消息。但是这个大佬只能处理特定的消息,针对这些“不认识”的消息有点懵逼,于是将这些消息做了一些加工,转化成MessageTask
,这样子大佬们就开心快乐的去干活了。然而实际上大佬不会亲自去干活,他只“安排”小弟去干,而具体做什么都在MessageTask
里面装着,将要做的事情全部安排在run
方法中。接下来就是对这些消息进行处理了。
这里要做最重要的一步就是将payload中的字节根据header中的序列化规则进行解码(反序列化),这也可以算是第二次封装了。为啥要在这里处理而不是在解码器处理,还是因为不能影响IO线程,那个家伙娇贵的很,累不得。经过反序列化后的对象叫做MessageWrapper
.这个家伙很真实,因为完全是属于业务层的包。无非就是consumer的诉求罢了:我要调用哪个接口,参数是什么,得是什么版本的blabla一系列信息。这些信息全部都放在传输层的body里面,现在通过反序列化真真实实的站在大佬面前。那么如何来处理consumer发送过来的请求呢?前提是得知道consumer端到底要什么。之前说过,consumer将请求报文全部封装到MessageWrapper
中,而其中有一个非常重要的信息:ServiceMetadata
。这个对象是一个“地址”,通过这个玩意provider就能找到consumer需要的调用对象以及其他相关的信息。原理也很简单,在provider发布的时候将这个“地址”在本地做一份映射不就完了。同时将这个地址发送到注册中心去。这样consumer到注册中心拿到这个地址然后向provider发送请求的时候,provider也就能做出回应了--找到对应的“服务”(本质上就是一个bean),通过consumer的请求参数,invoke一下,爱返回结果就返回结果,没有返回的拉倒。当然有结果返回的情况又涉及一次网络请求了--provider向consumer发送响应数据。
如此,rpc的前半个过程就完成了,接下来就是最最最伤脑筋的部分--调用。也就是上文未具体说到的部分。
private void process(ServiceWrapper service) {
// stack copy
final JRequest _request = request;
Context invokeCtx = new Context(service);
if (TracingUtil.isTracingNeeded()) {
setCurrentTraceId(_request.message().getTraceId());
}
try {
Object invokeResult = Chains.invoke(_request, invokeCtx)
.getResult();
ResultWrapper result = new ResultWrapper();
result.setResult(invokeResult);
byte s_code = _request.serializerCode();
Serializer serializer = SerializerFactory.getSerializer(s_code);
JResponsePayload responsePayload = new JResponsePayload(_request.invokeId());
if (CodecConfig.isCodecLowCopy()) {
OutputBuf outputBuf =
serializer.writeObject(channel.allocOutputBuf(), result);
responsePayload.outputBuf(s_code, outputBuf);
} else {
byte[] bytes = serializer.writeObject(result);
responsePayload.bytes(s_code, bytes);
}
responsePayload.status(Status.OK.value());
handleWriteResponse(responsePayload);
} catch (Throwable t) {
if (INVOKE_ERROR == t) {
// handle biz exception
handleException(invokeCtx.getExpectCauseTypes(), invokeCtx.getCause());
} else {
processor.handleException(channel, _request, Status.SERVER_ERROR, t);
}
} finally {
if (TracingUtil.isTracingNeeded()) {
TracingUtil.clearCurrent();
}
}
}
这段代码中最重要的一行是获取调用结果invokeResult
。这里使用Context
将ServiceWrapper
进行封装了一下,然后使用Chains#invoke
进行调用。其中就涉及到了一种设计模式--责任链。
static class Chains {
private static final JFilterChain headChain;
static {
JFilterChain invokeChain = new DefaultFilterChain(new InvokeFilter(), null);
JFilterChain interceptChain = new DefaultFilterChain(new InterceptorsFilter(), invokeChain);
headChain = JFilterLoader.loadExtFilters(interceptChain, JFilter.Type.PROVIDER);
}
static <T extends JFilterContext> T invoke(JRequest request, T invokeCtx) throws Throwable {
headChain.doFilter(request, invokeCtx);
return invokeCtx;
}
}
这点代码很简单,首先在static代码块中初始化了几个chain。headChain
为第一个chain,通过spi机制去动态加载META-INF/services/
目录下的配置文件来实例化JFilterChain
.当然只会去找类型为provider的chain,同时将下一个chain放进去。而下一个chain叫做interceptChain
,也是预先被初始化了,其中的filter的实现为InterceptorsFilter
,最后一个filterChain为invokeChain
,通过命名就知道这个chain是真正用来执行具体业务处理的。他没有下一个节点。
具体的调用顺序是headChain调用doFilter,内部其实是headChain持有的filter实例来调用doFilter,同时将headChain持有的实例nextChain作为参数传递进去。如果headChain的filter处理不了,就调用next的doFilter,而next也是同样的结构,也能做出同样的处理,这样一层一层的调用直到chain的尾巴,得到结果后再一层一层返回。
这种设计思想非常典型,很多框架中都有责任链模式的身影。而我们讨论的核心在invokeChain这一层。
static class InvokeFilter implements JFilter {
@Override
public Type getType() {
return Type.PROVIDER;
}
@Override
public <T extends JFilterContext> void doFilter(JRequest request, T filterCtx, JFilterChain next) throws Throwable {
MessageWrapper msg = request.message();
Context invokeCtx = (Context) filterCtx;
Object invokeResult = MessageTask.invoke(msg, invokeCtx);
invokeCtx.setResult(invokeResult);
}
}
可以看到,doFilter方法中并没有调用next,也证实了一点:这是chain的尾巴了。必须处理,不处理就没人处理了。而这里具体处理逻辑却又回到了MessageTask#invoke
,兜兜转转又是一圈。最终将返回的结果使用Context#setResult
进行填充。
private static Object invoke(MessageWrapper msg, Context invokeCtx) throws Signal {
ServiceWrapper service = invokeCtx.getService();
// 得到具体的实例
Object provider = service.getServiceProvider();
// 方法名
String methodName = msg.getMethodName();
// 方法参数
Object[] args = msg.getArgs();
// metrics api 用于统计数据
Timer.Context timerCtx = null;
if (METRIC_NEEDED) {
timerCtx = Metrics.timer(msg.getOperationName()).time();
}
Class<?>[] expectCauseTypes = null;
try {
List<Pair<Class<?>[], Class<?>[]>> methodExtension = service.getMethodExtension(methodName);
if (methodExtension == null) {
throw new NoSuchMethodException(methodName);
}
// 根据JLS方法调用的静态分派规则查找最匹配的方法parameterTypes
Pair<Class<?>[], Class<?>[]> bestMatch = Reflects.findMatchingParameterTypesExt(methodExtension, args);
Class<?>[] parameterTypes = bestMatch.getFirst();
expectCauseTypes = bestMatch.getSecond();
return Reflects.fastInvoke(provider, methodName, parameterTypes, args);
} catch (Throwable t) {
invokeCtx.setCauseAndExpectTypes(t, expectCauseTypes);
throw INVOKE_ERROR;
} finally {
if (METRIC_NEEDED) {
timerCtx.stop();
}
}
}
这段代码有点晦涩难懂。尤其是使用静态分派这块逻辑非常模糊。去看一下反射调用的相关api就知道,使用反射用到的参数得有方法名,方法参数,参数类型以及调用方法的对象。在ServiceWrapper
实例中已经有了方法名,参数,要调用的方法对象很显然就是他自己,就剩下一个参数类型不知道了。当然在rpc中请求端也没法吧参数类型给你传过来,这里需要自己去判断了。而Reflects.findMatchingParameterTypesExt
就是根据参数来判断参数类型到底是什么。最终使用Reflects.fastInvoke(provider, methodName, parameterTypes, args);
来完成调用。从这里看到参数确实是刚才提到的四个参数,缺一不可。然而这里并没有去使用反射调用的,而是使用字节码直接生成子类(但是反射的本质不就是生成子类吗?有点懵逼)。其实还是有一点区别的,在使用反射的时候,以jdk反射为例,每代理一个方法就会生成一个代理类,在需要很多代理方法需要被调用的时候就回生成很多个代理类,这样就很消耗性能。而这里使用的是通过字节码工具自己生成一个子类,并且缓存下来,这样节省很多性能。在benchmark中跑的结果确实比jdk反射性能要好很多。具体的代码实现就不去纠结了,反正也看不明白,这里就当作是反射调用就行了。关于Java语言的这种“动态”特性我不得不吐槽一下,虽说提供了一种基于运行时的修改程序的行为机制,但是真的是很麻烦,光看api都会把人给搞晕,非常不友好。而现在很多动态语言就很人性化,想改就改,非常轻松。其实我还是很喜欢Javascript的。
最终,整个调用的逻辑都完完全全走通了。当然,这只是基于正常的调用,也就是没有出现异常的情况。如果出现了异常情况改怎么处理呢?比如说空指针,除数为0等情况。在invoke的逻辑中,直接将Throwable
捕获到,塞进Context
中,最后抛出异常。在process逻辑也里会有捕获动作:
private void handleException(Class<?>[] exceptionTypes, Throwable failCause) {
if (exceptionTypes != null && exceptionTypes.length > 0) {
Class<?> failType = failCause.getClass();
for (Class<?> eType : exceptionTypes) {
// 如果抛出声明异常的子类, 客户端可能会因为不存在子类类型而无法序列化, 会在客户端抛出无法反序列化异常
if (eType.isAssignableFrom(failType)) {
// 预期内的异常
processor.handleException(channel, request, Status.SERVICE_EXPECTED_ERROR, failCause);
return;
}
}
}
// 预期外的异常
processor.handleException(channel, request, Status.SERVICE_UNEXPECTED_ERROR, failCause);
}
虽然这段代码比较长,但是核心就只有一点,处理异常消息。而正真做这件事交给了processor:
private void doHandleException(
JChannel channel, long invokeId, byte s_code, byte status, Throwable cause, boolean closeChannel) {
ResultWrapper result = new ResultWrapper();
// 截断cause, 避免客户端无法找到cause类型而无法序列化
cause = ThrowUtil.cutCause(cause);
result.setError(cause);
Serializer serializer = SerializerFactory.getSerializer(s_code);
JResponsePayload response = new JResponsePayload(invokeId);
response.status(status);
if (CodecConfig.isCodecLowCopy()) {
OutputBuf outputBuf =
serializer.writeObject(channel.allocOutputBuf(), result);
response.outputBuf(s_code, outputBuf);
} else {
byte[] bytes = serializer.writeObject(result);
response.bytes(s_code, bytes);
}
if (closeChannel) {
channel.write(response, JChannel.CLOSE);
} else {
channel.write(response, new JFutureListener<JChannel>() {
@Override
public void operationSuccess(JChannel channel) throws Exception {
logger.debug("Service error message sent out: {}.", channel);
}
@Override
public void operationFailure(JChannel channel, Throwable cause) throws Exception {
if (logger.isWarnEnabled()) {
logger.warn("Service error message sent failed: {}, {}.", channel, stackTrace(cause));
}
}
});
}
}
无非就是将异常对象写出去,当然也不一定是异常对象,也有可能是正常对象,管他呢,反正都是对象,客户端能够通过status自行去判断到底是什么类型。这样子,客户端调用一个rpc方法就像调用本地方法一样,也可以打印正常的异常栈信息,但是只能知道发生了什么异常,没办法去定位到哪一行有问题,这是很尴尬的。当然实际生成中这和你调用方关系不大,只需要遵循一个原则:谁写的bug谁去改。轻松甩锅。
终于,provider的核心基本上写完了,还有很多细节需要慢慢地理一遍,毕竟涉及到很多知识盲区,需要时间慢慢消化。写到这里我才发现还有一个比较关键的点没有涉及到,那就是服务的发布。接下来的一篇文章会简单概述provider是怎么“发布”出去的。