参考1:https://github.com/ZtesoftCS/go-ethereum-code-analysis
参考2:https://blog.csdn.net/liuzhijun301/article/details/80759920
认识RPC
GitHub调用RPC介绍:https://github.com/ethereum/wiki/wiki/JSON-RPC
RPC(Remote Process Call):远程过程调用,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络细节的应用程序通信协议.RPC构建于TCP或UDP或HTTP之上.允许开发者直接调用另一台计算机上的程序,而开发者无需额外地为这个调用过程编写通信代码,使开发网络分布式程序以及其它应用程序更加容易.
Go语言有net/rpc包,net/rpc包允许RPC客户端程序通过网络或是其他I/O连接调用一个远端对象的公开方法(方法名首字母必须大写,外部可调用).在RPC服务端,可将一个对象注册为可访问的服务.之后该对象的公开方法就能够以远程的方式提供访问.一个RPC服务端可以注册多个不同类型的对象,但不允许注册多个同一类型的对象.
一个对象需要满足下面这些条件,才能被RPC服务端设置为可供远程访问:
- 必须是在对象外部可公开调用的方法(首字母大写);
- 必须是两个参数,且参数的类型都必须是包外部可以访问的类型或者是Go内建支持的类型;
- 第二个参数必须是一个指针;
- 方法必须返回一个error
代码表示上述条件:
Type RpcObject struct{}
func(ro *RpcObject) Method(args T1,args *T2) error;
RPC包的大致结构
网络协议 channels和Json格式的请求和回应的编码和解码都是同时与服务端和客户端打交道的类.网络协议channels主要提供连接和数据传输的功能,json格式的编码和解码主要提供请求和回应的序列化和反序列化功能(Json->go对象)
[图片上传失败...(image-88cea-1542552106589)]
[图片上传失败...(image-764dff-1542552106589)]
GO实现简单RPC客户端与服务端调用Demo
服务端
package main
import (
"log"
"net/http"
"net/rpc"
)
/*
go对RPC的支持有三个级别:TCP/HTTP/JSONRPC
GO的RPC只支持GO开发的客户端和服务器之间的交互,因为采用gob编码
*/
type Params struct {
Width, Height int;
}
type Rect struct {}
/*
1.函数必须是导出的:函数首字母名称必须大写;
2.必须有两个导出参数类型;
3.第一个参数是接受参数;
4.第二个参数是返回给客户端参数,必须指针类型
5.函数还要有一个返回值error
*/
func (r *Rect) Area(p Params,ret *int) error {
*ret = (p.Width+p.Height)*2
return nil
}
func (r *Rect) Perimeter(p Params,ret *int) error {
*ret = (p.Width+p.Height)*2
return nil
}
func main() {
rect := new(Rect)
//注册一个rect服务
rpc.Register(rect)
//把服务处理绑定到http协议
rpc.HandleHTTP();
err := http.ListenAndServe(":8080",nil)
if err != nil {
log.Fatal(err)
}
}
客户端
package main
import (
"fmt"
"log"
rpc "net/rpc"
)
type Params struct {
Width, Height int
}
func main() {
//连接远程rpc服务
rpc,err :=rpc.DialHTTP("tcp","127.0.0.1:8080")
if err != nil {
log.Fatal(err)
}
ret := 0
//调用远程方法--->注意:第2个参数是指针类型
err2 := rpc.Call("Rect.Area", Params{50, 100}, &ret);
if err2!= nil {
log.Fatal(err2)
}
fmt.Println(ret)
err3 := rpc.Call("Rect.Perimeter", Params{100, 100}, &ret)
if err3 != nil{
log.Fatal(err3)
}
fmt.Println(ret)
}
Ethereum部分源码分析
server.go
server.go主要实现了RPC服务端的核心逻辑.包括RPC方法的注册,快速读取,处理请求,发送回应等逻辑.
server的核心数据结构是Server结构体.services字段是一个map,记录了所有注册的方法和类.run参数是用来控制Server的运行和停止的.codecs是一个set,用来存储所有的编码解码器,其实就是所有的连接.codecsMu是用来保护多线程访问codecs的锁.
services字段的value类型是service类型.service代表了一个注册到Server的实例,是一个对象和方法的组合.service字段的name代表了service的namespace,type实例的类型,callbacks是实例的回调方法,subscriptions是实例的订阅方法
//services的集合
type serviceRegistry map[string]*service // collection of services
//实例的回调方法
type callbacks map[string]*callback // collection of RPC callbacks
//实例的订阅方法
type subscriptions map[string]*callback
//核心数据结构
type Server struct {
services serviceRegistry
//run参数:控制Server的运行和停止
run int32
//保护多线程访问codecs的锁
codecsMu sync.Mutex
//用来存储所有的连接(编码和解码器)
codecs *set.Set
}
// callback is a method callback which was registered in the server
//在server中注册的回调方法
type callback struct {
rcvr reflect.Value // receiver of method
method reflect.Method // callback
argTypes []reflect.Type // input argument types
hasCtx bool // method's first argument is a context (not included in argTypes)
errPos int // err return idx, of -1 when method cannot return error
isSubscribe bool // indication if the callback is a subscription
}
// service represents a registered object
type service struct {
name string // name for service
typ reflect.Type // receiver type
callbacks callbacks // registered handlers
subscriptions subscriptions // available subscriptions/notifications
}
Server的创建,Server创建的时候通过调用server.RegisterName把自己的实例注册上来,提供一些RPC服务的元信息.
const MetadataApi = "rpc"
// NewServer will create a new server instance with no registered handlers.
func NewServer() *Server {
server := &Server{
services: make(serviceRegistry),
codecs: set.New(),
run: 1,
}
// register a default service which will provide meta information about the RPC service such as the services and methods it offers.
//注册一个包括提供关于RPC service的元数据的默认服务,例如它提供的服务和方法.
rpcService := &RPCService{server}
server.RegisterName(MetadataApi, rpcService)
return server
}
服务注册server.RegisterName,RegisterName方法会通过传入的参数来创建一个service对象,如果传入的rcvr实例没有找到任何合适的方法,那么会返回错误.如果没有错误,就把创建的service实例加入serviceRegistry.
// RegisterName will create a service for the given rcvr type under the given name. When no methods on the given rcvr
// match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is created and added to the service collection this server instance serves.
func (s *Server) RegisterName(name string, rcvr interface{}) error {
if s.services == nil {
s.services = make(serviceRegistry)
}
svc := new(service)
svc.typ = reflect.TypeOf(rcvr)
rcvrVal := reflect.ValueOf(rcvr)
if name == "" {
return fmt.Errorf("no service name for type %s", svc.typ.String())
}
//如果实例的类名不是导出的(类名的首字母大写),就返回错误。
if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
}
//通过反射信息找到合适的callbacks 和subscriptions方法
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
//如果这个名字当前已经被注册过了,那么如果有同名的方法就用新的替代,否者直接插入。
// already a previous service register under given sname, merge methods/subscriptions
if regsvc, present := s.services[name]; present {
if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
for _, m := range methods {
regsvc.callbacks[formatName(m.method.Name)] = m
}
for _, s := range subscriptions {
regsvc.subscriptions[formatName(s.method.Name)] = s
}
return nil
}
svc.name = name
svc.callbacks, svc.subscriptions = methods, subscriptions
if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
s.services[svc.name] = svc
return nil
}
通过反射信息找出合适的方法,suitableCallback,这个方法在utils.go里面.这个方法会遍历这个类型的所有方法,找到适配RPC callback或者subscription callback类型标准的方法并返回.关于RPC的标准,参考开头的RPC标准
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria.
func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) {
callbacks := make(callbacks)
subscriptions := make(subscriptions)
METHODS:
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := formatName(method.Name)
if method.PkgPath != "" { // method must be exported
continue
}
var h callback
h.isSubscribe = isPubSub(mtype)
h.rcvr = rcvr
h.method = method
h.errPos = -1
firstArg := 1
numIn := mtype.NumIn()
if numIn >= 2 && mtype.In(1) == contextType {
h.hasCtx = true
firstArg = 2
}
if h.isSubscribe {
h.argTypes = make([]reflect.Type, numIn-firstArg) // skip rcvr type
for i := firstArg; i < numIn; i++ {
argType := mtype.In(i)
if isExportedOrBuiltinType(argType) {
h.argTypes[i-firstArg] = argType
} else {
continue METHODS
}
}
subscriptions[mname] = &h
continue METHODS
}
// determine method arguments, ignore first arg since it's the receiver type
// Arguments must be exported or builtin types
h.argTypes = make([]reflect.Type, numIn-firstArg)
for i := firstArg; i < numIn; i++ {
argType := mtype.In(i)
if !isExportedOrBuiltinType(argType) {
continue METHODS
}
h.argTypes[i-firstArg] = argType
}
// check that all returned values are exported or builtin types
for i := 0; i < mtype.NumOut(); i++ {
if !isExportedOrBuiltinType(mtype.Out(i)) {
continue METHODS
}
}
// when a method returns an error it must be the last returned value
h.errPos = -1
for i := 0; i < mtype.NumOut(); i++ {
if isErrorType(mtype.Out(i)) {
h.errPos = i
break
}
}
if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 {
continue METHODS
}
switch mtype.NumOut() {
case 0, 1, 2:
if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
continue METHODS
}
callbacks[mname] = &h
}
}
return callbacks, subscriptions
}