go-micro请求处理过程
上文分析了我们自己创建的service(handler)是如何注册的,那么go-micro又如何知道一个服务查询请求应该怎么处理呢?
上文提到我们添加的service存放在server.rpcServer.handlers
map(类型map[string]server.Handler
)中。下面看一下相关的数据结构。
// server.Handler
// server.handler.go
type Handler interface {
Name() string
Handler() interface{}
Endpoints() []*registry.Endpoint
Options() HandlerOptions
}
// server.rpcHandler, implements server.Handler interface
// server.rpc_handler.go
type rpcHandler struct {
name string
handler interface{}
endpoints []*registry.Endpoint
opts HandlerOptions
}
接上文go-micro service启动流程
,启动后go-micro不断接受用户的远程调用请求,然后调用server.server#ServeRequest
来处理请求,接下来流程如下图所示。
注明: 水平方向表示内部调用,垂直方向表示顺序调用。
其中需要说明的是
service, mtype, ... := server.server#readRequest()
调用,该方法调用中从请求头中解析出请求的服务service以及请求的方法method(var mtype, reflect类型)。然后调用
mtype.method.Func()
完成远程方法原型的调用。那么问题又来了,远程调用客户端如何知道自己需要的服务在那台机器上呢?或者说客户端是如何实现远程过程调用的呢?
回顾一下hello_world client的远程调用过程:
type helloWorldClient struct {
c client.Client // micro.client.Client interface, implemented by micro.client.rpcClient
serviceName string
}
service := micro.NewService(
micro.Name("hello_world"),
micro.Version("latest"),
micro.Metadata(map[string]string{
"type": "helloworld",
}),
)
service.Init()
greeter := hello_world.NewHelloWorldClient("hello_world", service.Client())
rsp, err := greeter.Hello(context.TODO(), &hello_world.HelloWorldRequest{Name: "Alice"})
if err != nil {
fmt.Println(err)
return
}
与server端不一样的地方就是,初始化后我们会获取一个服务的client对象(类型为helloWorldClient
, 该结构体中有一个c micro.client.Client
,用来执行真正的方法调用)。
greeter := hello_world.NewHelloWorldClient("hello_world", service.Client())
紧接着我们会使用该client
对象greeter
进行远程过程调用:
rsp, err := greeter.Hello(context.TODO(), &hello_world.HelloWorldRequest{Name: "Alice"})
该方法具体实现如下:
func (c *helloWorldClient) Hello(ctx context.Context, in *HelloWorldRequest, opts ...client.CallOption) (*HelloWorldResponse, error) {
req := c.c.NewRequest(c.serviceName, "HelloWorld.Hello", in)
out := new(HelloWorldResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
在该方法中,我们会创建一个micro.client.Request
(类型micro.client.rpcRequest
,实现interface micro.client.Request
接口)对象,并且传入service_name
, 远程方法名"HelloWorld.Hello"
作为参数,然后通过micro.client.rpcClient#call()
执行远程方法调用。
该方法具体实现如下:
// micro/go-micro/client/rpc_client.go
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
// make copy of call method
rcall := r.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
rcall = callOpts.CallWrappers[i-1](rcall)
}
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the address
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
// make the call
err = rcall(ctx, address, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return err
}
ch := make(chan error, callOpts.Retries)
var gerr error
for i := 0; i < callOpts.Retries; i++ {
go func() {
ch <- call(i)
}()
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, request, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
return gerr
}
可知该方法的第一步即是向service registry
进行服务查询:
r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
然后执行到目标service所在address
的远程过程调用。
至此,分析结束。
题外话: Dubbo和go-micro的服务注册的区别。
Dubbo服务注册和发现
- 服务容器负责启动,加载,运行服务提供者。
- 服务提供者在启动时,向注册中心注册自己提供的服务。
- 服务消费者在启动时,向注册中心订阅自己所需的服务。
- 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
- 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
- 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心
参考文章
go-micro服务注册和发现
- 启动
service registry
服务(etcd, consul) - 服务提供者向
service registry
进行服务注册 - 服务消费者向
service registry
进行服务查询 -
service registry
返回服务消费者查询服务地址 - 服务消费者想服务所在地址发起远程过程调用