server.go 实现了RPC服务端的核心逻辑,包括注册、读取请求、处理请求、发送回应等逻辑。
// Server represents a RPC server
type Server struct {
services serviceRegistry
run int32
codecsMu sync.Mutex
codecs mapset.Set
}
// callback is a method callback which was registered in the server
type callback struct {
rcvr reflect.Value // receiver of method
method reflect.Method // callback
argTypes []reflect.Type // input argument types
hasCtx bool // method's first argument is a context (not included in argTypes)
errPos int // err return idx, of -1 when method cannot return error
isSubscribe bool // indication if the callback is a subscription
}
// service represents a registered object
type service struct {
name string // name for service
typ reflect.Type // receiver type
callbacks callbacks // registered handlers
subscriptions subscriptions // available subscriptions/notifications
}
Server创建的时候通过调用RegisterName注册自己的实例。
// NewServer will create a new server instance with no registered handlers.
func NewServer() *Server {
server := &Server{
services: make(serviceRegistry),
codecs: mapset.NewSet(),
run: 1,
}
// register a default service which will provide meta information about the RPC service such as the services and
// methods it offers.
rpcService := &RPCService{server}
server.RegisterName(MetadataApi, rpcService)
return server
}
服务注册RegisterName,该方法会通过传入的参数创建一个service对象,如果没有找到合适的方法则返回错误,如果没有错误,则将创建的service实例加入serviceRegistry。
// RegisterName will create a service for the given rcvr type under the given name. When no methods on the given rcvr
// match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is
// created and added to the service collection this server instance serves.
func (s *Server) RegisterName(name string, rcvr interface{}) error {
if s.services == nil {
s.services = make(serviceRegistry)
}
svc := new(service)
svc.typ = reflect.TypeOf(rcvr)
rcvrVal := reflect.ValueOf(rcvr)
if name == "" {
return fmt.Errorf("no service name for type %s", svc.typ.String())
}
if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
}
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
// already a previous service register under given name, merge methods/subscriptions
if regsvc, present := s.services[name]; present {
for _, m := range methods {
regsvc.callbacks[formatName(m.method.Name)] = m
}
for _, s := range subscriptions {
regsvc.subscriptions[formatName(s.method.Name)] = s
}
return nil
}
svc.name = name
svc.callbacks, svc.subscriptions = methods, subscriptions
s.services[svc.name] = svc
return nil
}
suitableCallbacks()遍历这个类型的所有方法并返回。
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria.
func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) {
callbacks := make(callbacks)
subscriptions := make(subscriptions)
METHODS:
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := formatName(method.Name)
if method.PkgPath != "" { // method must be exported
continue
}
var h callback
h.isSubscribe = isPubSub(mtype)
h.rcvr = rcvr
h.method = method
h.errPos = -1
firstArg := 1
numIn := mtype.NumIn()
if numIn >= 2 && mtype.In(1) == contextType {
h.hasCtx = true
firstArg = 2
}
if h.isSubscribe {
h.argTypes = make([]reflect.Type, numIn-firstArg) // skip rcvr type
for i := firstArg; i < numIn; i++ {
argType := mtype.In(i)
if isExportedOrBuiltinType(argType) {
h.argTypes[i-firstArg] = argType
} else {
continue METHODS
}
}
subscriptions[mname] = &h
continue METHODS
}
// determine method arguments, ignore first arg since it's the receiver type
// Arguments must be exported or builtin types
h.argTypes = make([]reflect.Type, numIn-firstArg)
for i := firstArg; i < numIn; i++ {
argType := mtype.In(i)
if !isExportedOrBuiltinType(argType) {
continue METHODS
}
h.argTypes[i-firstArg] = argType
}
// check that all returned values are exported or builtin types
for i := 0; i < mtype.NumOut(); i++ {
if !isExportedOrBuiltinType(mtype.Out(i)) {
continue METHODS
}
}
// when a method returns an error it must be the last returned value
h.errPos = -1
for i := 0; i < mtype.NumOut(); i++ {
if isErrorType(mtype.Out(i)) {
h.errPos = i
break
}
}
if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 {
continue METHODS
}
switch mtype.NumOut() {
case 0, 1, 2:
if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
continue METHODS
}
callbacks[mname] = &h
}
}
return callbacks, subscriptions
}
server启动和服务,在ipc.go中,可以看到每Accept(),则启动一个goroutine调用srv.ServeCodec来进行服务。
func (srv *Server) ServeListener(l net.Listener) error {
for {
conn, err := l.Accept()
if err != nil {
return err
}
log.Trace(fmt.Sprint("accepted conn", conn.RemoteAddr()))
go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
}
}
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
defer codec.Close()
s.serveRequest(codec, false, options)
}
serveRequest从codec读取请求,调用对应方法并返回至codec,sync.WaitGroup实现了一个信号量的功能,context实现上下文管理。
func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {
var pend sync.WaitGroup
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Error(string(buf))
}
s.codecsMu.Lock()
s.codecs.Remove(codec)
s.codecsMu.Unlock()
}()
// ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
//如果codec支持,可以通过一个叫notifier的对象执行回调函数发送消息给客户端。
// if the codec supports notification include a notifier that callbacks can use
// to send notification to clients. It is tied to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
if options&OptionSubscriptions == OptionSubscriptions {
ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
}
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
s.codecsMu.Unlock()
return &shutdownError{}
}
s.codecs.Add(codec)
s.codecsMu.Unlock()
// test if the server is ordered to stop
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
if err != nil {
// If a parsing error occurred, send an error
if err.Error() != "EOF" {
log.Debug(fmt.Sprintf("read error %v\n", err))
codec.Write(codec.CreateErrorResponse(nil, err))
}
// Error or end of stream, wait for requests and tear down
pend.Wait()
return nil
}
// check if server is ordered to shutdown and return an error
// telling the client that his request failed.
if atomic.LoadInt32(&s.run) != 1 {
err = &shutdownError{}
if batch {
resps := make([]interface{}, len(reqs))
for i, r := range reqs {
resps[i] = codec.CreateErrorResponse(&r.id, err)
}
codec.Write(resps)
} else {
codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
}
return nil
}
// If a single shot request is executing, run and return immediately
if singleShot {
if batch {
s.execBatch(ctx, codec, reqs)
} else {
s.exec(ctx, codec, reqs[0])
}
return nil
}
// For multi-shot connections, start a goroutine to serve and loop back
pend.Add(1)
go func(reqs []*serverRequest, batch bool) {
defer pend.Done()
if batch {
s.execBatch(ctx, codec, reqs)
} else {
s.exec(ctx, codec, reqs[0])
}
}(reqs, batch)
}
return nil
}
readRequest方法, 从codec读取请求,查找对应的方法组装成rpcRequest。
type rpcRequest struct {
service string
method string
id interface{}
isPubSub bool
params interface{}
err Error // invalid batch element
}
然后返回serverRequest
// serverRequest is an incoming request
type serverRequest struct {
id interface{}
svcname string
callb *callback
args []reflect.Value
isUnsubscribe bool
err Error
}
readRequest方法,从codec读取请求,对请求进行处理生成serverRequest对象返回。
func (s Server) readRequest(codec ServerCodec) ([]serverRequest, bool, Error) {
reqs, batch, err := codec.ReadRequestHeaders()
if err != nil {
return nil, batch, err
}
requests := make([]*serverRequest, len(reqs))
//根据reqs构建requests
// verify requests
for i, r := range reqs {
var ok bool
var svc *service
if r.err != nil {
requests[i] = &serverRequest{id: r.id, err: r.err}
continue
}
//如果请求时发送/订阅的请求,而且方法名称有_unsuvscribe后缀。
if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
requests[i].args = args
} else {
requests[i].err = &invalidParamsError{err.Error()}
}
continue
}
//如果没有注册这个方法
if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
continue
}
//如果是发布和订阅模式,调用订阅方法。
if r.isPubSub { // eth_subscribe, r.method contains the subscription method name
if callb, ok := svc.subscriptions[r.method]; ok {
requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
if r.params != nil && len(callb.argTypes) > 0 {
argTypes := []reflect.Type{reflect.TypeOf("")}
argTypes = append(argTypes, callb.argTypes...)
if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
requests[i].args = args[1:] // first one is service.method name which isn't an actual argument
} else {
requests[i].err = &invalidParamsError{err.Error()}
}
}
} else {
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
}
continue
}
if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
if r.params != nil && len(callb.argTypes) > 0 {
if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
requests[i].args = args
} else {
requests[i].err = &invalidParamsError{err.Error()}
}
}
continue
}
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
}
return requests, batch, nil
}
exec&execBatch方法,调用s.handle方法对request进行处理。
// exec executes the given request and writes the result back using the codec.
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
var response interface{}
var callback func()
if req.err != nil {
response = codec.CreateErrorResponse(&req.id, req.err)
} else {
response, callback = s.handle(ctx, codec, req)
}
if err := codec.Write(response); err != nil {
log.Error(fmt.Sprintf("%v\n", err))
codec.Close()
}
// when request was a subscribe request this allows these subscriptions to be actived
if callback != nil {
callback()
}
}
// execBatch executes the given requests and writes the result back using the codec.
// It will only write the response back when the last request is processed.
func (s Server) execBatch(ctx context.Context, codec ServerCodec, requests []serverRequest) {
responses := make([]interface{}, len(requests))
var callbacks []func()
for i, req := range requests {
if req.err != nil {
responses[i] = codec.CreateErrorResponse(&req.id, req.err)
} else {
var callback func()
if responses[i], callback = s.handle(ctx, codec, req); callback != nil {
callbacks = append(callbacks, callback)
}
}
}
if err := codec.Write(responses); err != nil {
log.Error(fmt.Sprintf("%v\n", err))
codec.Close()
}
// when request holds one of more subscribe requests this allows these subscriptions to be activated
for _, c := range callbacks {
c()
}
}
handle方法,执行一个request,然后返回response。
// handle executes a request and returns the response from the callback.
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
if req.err != nil {
return codec.CreateErrorResponse(&req.id, req.err), nil
}
//取消订阅消息,通过NotifierFromContext获取存入ctx的notifier。
if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
notifier, supported := NotifierFromContext(ctx)
if !supported { // interface doesn't support subscriptions (e.g. http)
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
}
subid := ID(req.args[0].String())
if err := notifier.unsubscribe(subid); err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
return codec.CreateResponse(req.id, true), nil
}
return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
}
//如果是订阅消息,创建订阅并激活
if req.callb.isSubscribe {
subid, err := s.createSubscription(ctx, codec, req)
if err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
notifier.activate(subid, req.svcname)
}
return codec.CreateResponse(req.id, subid), activateSub
}
// regular RPC call, prepare arguments
if len(req.args) != len(req.callb.argTypes) {
rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
req.svcname, serviceMethodSeparator, req.callb.method.Name,
len(req.callb.argTypes), len(req.args))}
return codec.CreateErrorResponse(&req.id, rpcErr), nil
}
arguments := []reflect.Value{req.callb.rcvr}
if req.callb.hasCtx {
arguments = append(arguments, reflect.ValueOf(ctx))
}
if len(req.args) > 0 {
arguments = append(arguments, req.args...)
}
// execute RPC method and return result
reply := req.callb.method.Func.Call(arguments)
if len(reply) == 0 {
return codec.CreateResponse(req.id, nil), nil
}
if req.callb.errPos >= 0 { // test if method returned an error
if !reply[req.callb.errPos].IsNil() {
e := reply[req.callb.errPos].Interface().(error)
res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
return res, nil
}
}
return codec.CreateResponse(req.id, reply[0].Interface()), nil
}
subscription.go发布订阅模式
在服务一个客户端连接时候,调用newNotifier方法创建了一个notifier对象存储到ctx中。可以观察到Notifier对象保存了codec的实例,也就是说Notifier对象保存了网络连接,用来在需要的时候发送数据。
// newNotifier creates a new notifier that can be used to send subscription
// notifications to the client.
func newNotifier(codec ServerCodec) Notifier {
return &Notifier{
codec: codec,
active: make(map[ID]Subscription),
inactive: make(map[ID]*Subscription),
}
}
createSubscription方法会调用指定的注册上来的方法,并得到回应。
// createSubscription will call the subscription callback and returns the subscription id or error.
func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
// subscription have as first argument the context following optional arguments
args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
args = append(args, req.args...)
reply := req.callb.method.Func.Call(args)
if !reply[1].IsNil() { // subscription creation failed
return "", reply[1].Interface().(error)
}
return reply[0].Interface().(*Subscription).ID, nil
}
在来看看我们的activate方法,这个方法激活了subscription。 subscription在subscription ID被发送给客户端之后被激活,避免客户端还没有收到subscription ID的时候就收到了subscription信息。
// activate enables a subscription. Until a subscription is enabled all
// notifications are dropped. This method is called by the RPC server after
// the subscription ID was sent to client. This prevents notifications being
// send to the client before the subscription ID is send to the client.
func (n *Notifier) activate(id ID, namespace string) {
n.subMu.Lock()
defer n.subMu.Unlock()
if sub, found := n.inactive[id]; found {
sub.namespace = namespace
n.active[id] = sub
delete(n.inactive, id)
}
}
取消订阅的函数
// unsubscribe a subscription.
// If the subscription could not be found ErrSubscriptionNotFound is returned.
func (n *Notifier) unsubscribe(id ID) error {
n.subMu.Lock()
defer n.subMu.Unlock()
if s, found := n.active[id]; found {
close(s.err)
delete(n.active, id)
return nil
}
return ErrSubscriptionNotFound
}
最后是一个发送订阅的函数,调用这个函数把数据发送到客户端, 这个也比较简单。
// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error {
n.subMu.RLock()
defer n.subMu.RUnlock()
sub, active := n.active[id]
if active {
notification := n.codec.CreateNotification(string(id), sub.namespace, data)
if err := n.codec.Write(notification); err != nil {
n.codec.Close()
return err
}
}
return nil
}
client.go
先来看看客户端的数据结构
// Client represents a connection to an RPC server.
type Client struct {
idCounter uint32
connectFunc func(ctx context.Context) (net.Conn, error)
isHTTP bool
// writeConn is only safe to access outside dispatch, with the
// write lock held. The write lock is taken by sending on
// requestOp and released by sending on sendDone.
writeConn net.Conn
// for dispatch
close chan struct{}
didQuit chan struct{} // closed when client quits
reconnected chan net.Conn // where write/reconnect sends the new connection
readErr chan error // errors from read
readResp chan []jsonrpcMessage // valid messages from read
requestOp chan requestOp // for registering response IDs
sendDone chan error // signals write completion, releases write lock
respWait map[string]requestOp // active requests
subs map[string]ClientSubscription // active subscriptions
}
newClient, 新建一个客户端。 通过调用connectFunc方法来获取一个网络连接,如果网络连接是httpConn对象的化,那么isHTTP设置为true。然后是对象的初始化, 如果是HTTP连接的化,直接返回,否者就启动一个goroutine调用dispatch方法。 dispatch方法是整个client的指挥中心,通过上面提到的channel来和其他的goroutine来进行通信,获取信息,根据信息做出各种决策。后续会详细介绍dispatch。 因为HTTP的调用方式非常简单, 这里先对HTTP的方式做一个简单的阐述。
func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (Client, error) {
//调用connectFunc方法来获取一个网络连接
conn, err := connectFunc(initctx)
if err != nil {
return nil, err
}
_, isHTTP := conn.(httpConn)
c := &Client{
writeConn: conn,
isHTTP: isHTTP,
connectFunc: connectFunc,
close: make(chan struct{}),
didQuit: make(chan struct{}),
reconnected: make(chan net.Conn),
readErr: make(chan error),
readResp: make(chan []jsonrpcMessage),
requestOp: make(chan requestOp),
sendDone: make(chan error, 1),
respWait: make(map[string]requestOp),
subs: make(map[string]ClientSubscription),
}
if !isHTTP {
go c.dispatch(conn)
}
return c, nil
}
请求通过调用client的call方法来进行RPC调用
// Call performs a JSON-RPC call with the given arguments and unmarshals into
// result if no error occurred.
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
ctx := context.Background()
return c.CallContext(ctx, result, method, args...)
}
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
msg, err := c.newMessage(method, args...)
if err != nil {
return err
}
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
}
if err != nil {
return err
}
// dispatch has accepted the request and will close the channel when it quits.
switch resp, err := op.wait(ctx); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
return ErrNoResult
default:
return json.Unmarshal(resp.Result, &result)
}
}
sendHTTP,这个方法直接调用doRequest方法进行请求拿到回应。然后写入到resp队列就返回了。
func (c *Client) sendHTTP(ctx context.Context, op requestOp, msg interface{}) error {
hc := c.writeConn.(httpConn)
respBody, err := hc.doRequest(ctx, msg)
if err != nil {
return err
}
defer respBody.Close()
var respmsg jsonrpcMessage
if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
return err
}
op.resp <- &respmsg
return nil
}
在看看上面的另一个方法 op.wait()方法,这个方法会查看两个队列的信息。如果是http那么从resp队列获取到回应就会直接返回。 这样整个HTTP的请求过程就完成了。 中间没有涉及到多线程问题,都在一个线程内部完成了。
func (op requestOp) wait(ctx context.Context) (jsonrpcMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-op.resp:
return resp, op.err
}
}
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
select {
case c.requestOp <- op:
log.Trace("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("sending ", msg)
}})
err := c.write(ctx, msg)
c.sendDone <- err
return err
case <-ctx.Done():
// This can happen if the client is overloaded or unable to keep up with
// subscription notifications.
return ctx.Err()
case <-c.didQuit:
//已经退出,可能被调用了Close
return ErrClientQuit
}
}
dispatch方法
// dispatch is the main loop of the client.
// It sends read messages to waiting calls to Call and BatchCall
// and subscription notifications to registered subscriptions.
func (c *Client) dispatch(conn net.Conn) {
// Spawn the initial read loop.
go c.read(conn)
var (
lastOp *requestOp // tracks last send operation
requestOpLock = c.requestOp // nil while the send lock is held
reading = true // if true, a read loop is running
)
defer close(c.didQuit)
defer func() {
c.closeRequestOps(ErrClientQuit)
conn.Close()
if reading {
// Empty read channels until read is dead.
for {
select {
case <-c.readResp:
case <-c.readErr:
return
}
}
}
}()
for {
select {
case <-c.close:
return
// Read path.
case batch := <-c.readResp:
for _, msg := range batch {
switch {
case msg.isNotification():
log.Trace("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: notification ", msg)
}})
c.handleNotification(msg)
case msg.isResponse():
log.Trace("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: response ", msg)
}})
c.handleResponse(msg)
default:
log.Debug("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: dropping weird message", msg)
}})
// TODO: maybe close
}
}
case err := <-c.readErr:
log.Debug("<-readErr", "err", err)
c.closeRequestOps(err)
conn.Close()
reading = false
case newconn := <-c.reconnected:
log.Debug("<-reconnected", "reading", reading, "remote", conn.RemoteAddr())
if reading {
// Wait for the previous read loop to exit. This is a rare case.
conn.Close()
<-c.readErr
}
go c.read(newconn)
reading = true
conn = newconn
// Send path.
case op := <-requestOpLock:
// Stop listening for further send ops until the current one is done.
requestOpLock = nil
lastOp = op
for _, id := range op.ids {
c.respWait[string(id)] = op
}
case err := <-c.sendDone:
if err != nil {
// Remove response handlers for the last send. We remove those here
// because the error is already handled in Call or BatchCall. When the
// read loop goes down, it will signal all other current operations.
for _, id := range lastOp.ids {
delete(c.respWait, string(id))
}
}
// Listen for send ops again.
requestOpLock = c.requestOp
lastOp = nil
}
}
}
客户端-订阅模式的特殊处理
以太坊的RPC框架支持发布和订阅的模式。
//Subscribe会使用传入的参数调用"<namespace>_subscribe"方法来订阅指定的消息。
//服务器的通知会写入channel参数指定的队列。 channel参数必须和返回的类型相同。
//ctx参数可以用来取消RPC的请求,但是如果订阅已经完成就不会有效果了。
//处理速度太慢的订阅者的消息会被删除,每个客户端有8000个消息的缓存。
func (c Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic("first argument to Subscribe must be a writable channel")
}
if chanVal.IsNil() {
panic("channel given to Subscribe must not be nil")
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}
msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
//requestOp的参数和Call调用的不一样。 多了一个参数sub.
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
sub: newClientSubscription(c, namespace, chanVal),
}
// Send the subscription request.
// The arrival and validity of the response is signaled on sub.quit.
if err := c.send(ctx, op, msg); err != nil {
return nil, err
}
if _, err := op.wait(ctx); err != nil {
return nil, err
}
return op.sub, nil
}
newClientSubscription方法,这个方法创建了一个新的对象ClientSubscription,这个对象把传入的channel参数保存起来。 然后自己又创建了三个chan对象。后续会对详细介绍这三个chan对象
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{
client: c,
namespace: namespace,
etype: channel.Type().Elem(),
channel: channel,
quit: make(chan struct{}),
err: make(chan error, 1),
in: make(chan json.RawMessage),
}
return sub
}
从上面的代码可以看出。订阅过程根Call过程差不多,构建一个订阅请求。调用send发送到网络上,然后等待返回。 我们通过dispatch对返回结果的处理来看看订阅和Call的不同。
func (c *Client) handleResponse(msg *jsonrpcMessage) {
op := c.respWait[string(msg.ID)]
if op == nil {
log.Debug(fmt.Sprintf("unsolicited response %v", msg))
return
}
delete(c.respWait, string(msg.ID))
// For normal responses, just forward the reply to Call/BatchCall.
如果op.sub是nil,普通的RPC请求,这个字段的值是空白的,只有订阅请求才有值。
if op.sub == nil {
op.resp <- msg
return
}
// For subscription responses, start the subscription if the server
// indicates success. EthSubscribe gets unblocked in either case through
// the op.resp channel.
defer close(op.resp)
if msg.Error != nil {
op.err = msg.Error
return
}
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
//启动一个新的goroutine 并把op.sub.subid记录起来。
go op.sub.start()
c.subs[op.sub.subid] = op.sub
}
}
op.sub.start方法,专门用来处理订阅消息。主要的功能是从in队列里面获取订阅消息,然后把订阅消息放到buffer里面。 如果数据能够发送。就从buffer里面发送一些数据给用户传入的那个channel。 如果buffer超过指定的大小,就丢弃。
func (sub *ClientSubscription) start() {
sub.quitWithError(sub.forward())
}
func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
{Dir: reflect.SelectSend, Chan: sub.channel},
}
buffer := list.New()
defer buffer.Init()
for {
var chosen int
var recv reflect.Value
if buffer.Len() == 0 {
// Idle, omit send case.
chosen, recv, _ = reflect.Select(cases[:2])
} else {
// Non-empty buffer, send the first queued item.
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(cases)
}
switch chosen {
case 0: // <-sub.quit
return nil, false
case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
if err != nil {
return err, true
}
if buffer.Len() == maxClientSubscriptionBuffer {
return ErrSubscriptionQueueOverflow, true
}
buffer.PushBack(val)
case 2: // sub.channel<-
cases[2].Send = reflect.Value{} // Don't hold onto the value.
buffer.Remove(buffer.Front())
}
}
}
当接收到一条Notification消息的时候会调用handleNotification方法。会把消息传送给in队列。
[图片上传失败...(image-b9d024-1535549234468)]
func (c *Client) handleNotification(msg *jsonrpcMessage) {
if !strings.HasSuffix(msg.Method, notificationMethodSuffix) {
log.Debug(fmt.Sprint("dropping non-subscription message: ", msg))
return
}
var subResult struct {
ID string json:"subscription"
Result json.RawMessage json:"result"
}
if err := json.Unmarshal(msg.Params, &subResult); err != nil {
log.Debug(fmt.Sprint("dropping invalid subscription message: ", msg))
return
}
if c.subs[subResult.ID] != nil {
c.subs[subResult.ID].deliver(subResult.Result)
}
}
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
select {
case sub.in <- result:
return true
case <-sub.quit:
return false
}
}