首先,在之前的go公链实战中大概介绍了区块链网络的原理和实现,通信协议的实现参照的是Bitcoin的,这里以太坊的通信协议也大同小异。
以太坊devp2p最新动态和相关说明请点击这里。谈到devp2p就不得不讲libp2p,libp2p是一个开源的第三方p2p网络实现库,它汇集了各种传输和点对点协议,使开发人员可以轻松构建大型,强大的p2p网络。
废话少说撸代码
p2p目录结构
Server
我们就从服务器逻辑处理类来入手。
// Server manages all peer connections.
// 点对点连接的服务器管理者
type Server struct {
// Config fields may not be modified while the server is running.
// 服务器配置
Config
// Hooks for testing. These are useful because we can inhibit
// the whole protocol stack.
// 测试hook
newTransport func(net.Conn) transport
newPeerHook func(*Peer)
// 同步锁
lock sync.Mutex // protects running
running bool
ntab discoverTable
listener net.Listener
ourHandshake *protoHandshake
lastLookup time.Time
DiscV5 *discv5.Network
// These are for Peers, PeerCount (and nothing else).
peerOp chan peerOpFunc
peerOpDone chan struct{}
quit chan struct{}
addstatic chan *discover.Node
removestatic chan *discover.Node
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
peerFeed event.Feed
log log.Logger
}
...
// Config holds Server options.
type Config struct {
// This field must be set to a valid secp256k1 private key.
// 节点私钥
PrivateKey *ecdsa.PrivateKey `toml:"-"`
// MaxPeers is the maximum number of peers that can be
// connected. It must be greater than zero.
// 允许连接的最大节点数
MaxPeers int
// MaxPendingPeers is the maximum number of peers that can be pending in the
// handshake phase, counted separately for inbound and outbound connections.
// Zero defaults to preset values.
// 握手阶段可以建立的最大连接数
MaxPendingPeers int `toml:",omitempty"`
// DialRatio controls the ratio of inbound to dialed connections.
// Example: a DialRatio of 2 allows 1/2 of connections to be dialed.
// Setting DialRatio to zero defaults it to 3.
// 控制拨号比例 eg:DialRatio = 2 允许拨打1/2连接
DialRatio int `toml:",omitempty"`
// NoDiscovery can be used to disable the peer discovery mechanism.
// Disabling is useful for protocol debugging (manual topology).
// 禁用发现
NoDiscovery bool
// DiscoveryV5 specifies whether the the new topic-discovery based V5 discovery
// protocol should be started or not.
// 是否启用新的发现协议
DiscoveryV5 bool `toml:",omitempty"`
// Name sets the node name of this server.
// Use common.MakeName to create a name that follows existing conventions.
// 节点名称
Name string `toml:"-"`
// BootstrapNodes are used to establish connectivity
// with the rest of the network.
// 用于和网络其余部分建立连接
BootstrapNodes []*discover.Node
// BootstrapNodesV5 are used to establish connectivity
// with the rest of the network using the V5 discovery
// protocol.
// 新的发现协议使用
BootstrapNodesV5 []*discv5.Node `toml:",omitempty"`
// Static nodes are used as pre-configured connections which are always
// maintained and re-connected on disconnects.
// 静态节点
StaticNodes []*discover.Node
// Trusted nodes are used as pre-configured connections which are always
// allowed to connect, even above the peer limit.
// 可信节点,始终允许连接
TrustedNodes []*discover.Node
// Connectivity can be restricted to certain IP networks.
// If this option is set to a non-nil value, only hosts which match one of the
// IP networks contained in the list are considered.
// 限制连接设置
// NetRestrict != 0 则只考虑与列表中包含的IP进行连接
NetRestrict *netutil.Netlist `toml:",omitempty"`
// NodeDatabase is the path to the database containing the previously seen
// live nodes in the network.
// 节点数据库路径
NodeDatabase string `toml:",omitempty"`
// Protocols should contain the protocols supported
// by the server. Matching protocols are launched for
// each peer.
// 协议集
Protocols []Protocol `toml:"-"`
// If ListenAddr is set to a non-nil address, the server
// will listen for incoming connections.
//
// If the port is zero, the operating system will pick a port. The
// ListenAddr field will be updated with the actual address when
// the server is started.
// 监听地址
// 如果ListenAddr设置为非零地址,则服务器将侦听传入连接
// ListenAddr = 0 则系统选择一个端口
ListenAddr string
// If set to a non-nil value, the given NAT port mapper
// is used to make the listening port available to the
// Internet.
// NAT端口
NAT nat.Interface `toml:",omitempty"`
// If Dialer is set to a non-nil value, the given Dialer
// is used to dial outbound peer connections.
// 拨号方
Dialer NodeDialer `toml:"-"`
// If NoDial is true, the server will not dial any peers.
// 不与任何节点发起连接的标志
NoDial bool `toml:",omitempty"`
// If EnableMsgEvents is set then the server will emit PeerEvents
// whenever a message is sent to or received from a peer
// EnableMsgEvents = yes 则只要发起对等连接或接收消息,服务器就会发出PeerEvent
EnableMsgEvents bool
// Logger is a custom logger to use with the p2p.Server.
// 客户端日志
Logger log.Logger `toml:",omitempty"`
}
...
const (
// 默认的连接超时时间
defaultDialTimeout = 15 * time.Second
// Connectivity defaults.
// 默认连接属性
maxActiveDialTasks = 16
defaultMaxPendingPeers = 50
defaultDialRatio = 3
// Maximum time allowed for reading a complete message.
// This is effectively the amount of time a connection can be idle.
// 读取一个完整消息需要的最长时间(连接空闲时间)
frameReadTimeout = 30 * time.Second
// Maximum amount of time allowed for writing a complete message.
// 构造一个完整消息徐亚的最长时间
frameWriteTimeout = 20 * time.Second
)
然后看启动p2p服务的代码。
// Start starts running the server.
// Servers can not be re-used after stopping.
// 启动p2p
func (srv *Server) Start() (err error) {
// 避免重复多次启动
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.New()
}
srv.log.Info("Starting P2P networking")
// static fields
if srv.PrivateKey == nil {
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
// 加密链路使用RLPX协议
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
// 使用TCPDialer
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
// 各种channel通道
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
var (
conn *net.UDPConn
sconn *sharedUDPConn
realaddr *net.UDPAddr
unhandled chan discover.ReadPacket
)
if !srv.NoDiscovery || srv.DiscoveryV5 {
// 启动discover网络
addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
if err != nil {
return err
}
// 开启UDP监听
conn, err = net.ListenUDP("udp", addr)
if err != nil {
return err
}
realaddr = conn.LocalAddr().(*net.UDPAddr)
if srv.NAT != nil {
if !realaddr.IP.IsLoopback() {
go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
}
// TODO: react to external IP changes over time.
if ext, err := srv.NAT.ExternalIP(); err == nil {
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
}
if !srv.NoDiscovery && srv.DiscoveryV5 {
// 新的节点发现协议
unhandled = make(chan discover.ReadPacket, 100)
sconn = &sharedUDPConn{conn, unhandled}
}
// node table
if !srv.NoDiscovery {
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
AnnounceAddr: realaddr,
NodeDBPath: srv.NodeDatabase,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
}
ntab, err := discover.ListenUDP(conn, cfg)
if err != nil {
return err
}
srv.ntab = ntab
}
if srv.DiscoveryV5 {
var (
ntab *discv5.Network
err error
)
if sconn != nil {
ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
} else {
ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
}
if err != nil {
return err
}
if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
return err
}
srv.DiscV5 = ntab
}
dynPeers := srv.maxDialedConns()
// 新建dialerstate用来处理与节点的连接
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
// handshake
// 握手协议
srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
for _, p := range srv.Protocols {
srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
}
// listen/dial
if srv.ListenAddr != "" {
// 开始监听TCP端口
if err := srv.startListening(); err != nil {
return err
}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
srv.loopWG.Add(1)
// 开启协程来处理
go srv.run(dialer)
srv.running = true
return nil
}
接着继续来看里面,有关启动tcp监听的函数srv.startListening().它会启动底层来监听socket,当收到请求Accept后会调用setupConn()函数来建立连接。
setupConn()中握手主要分两个阶段:加密握手和协议握手。
func (srv *Server) startListening() error {
// Launch the TCP listener.
listener, err := net.Listen("tcp", srv.ListenAddr)
if err != nil {
return err
}
laddr := listener.Addr().(*net.TCPAddr)
srv.ListenAddr = laddr.String()
srv.listener = listener
srv.loopWG.Add(1)
go srv.listenLoop()
// Map the TCP listening port if NAT is configured.
// 匹配TCP端口
if !laddr.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
return nil
}
...
// listenLoop runs in its own goroutine and accepts
// inbound connections.
// 循环监听的一个协程
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
// 连接处理数
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
for {
// Wait for a handshake slot before accepting.
// 等待握手信号槽
<-slots
var (
fd net.Conn
err error
)
for {
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
srv.log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
srv.log.Debug("Read error", "err", err)
return
}
break
}
// Reject connections that do not match NetRestrict.
// 拒绝不在白名单NetRestrict里的连接
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
}
}
fd = newMeteredConn(fd, true)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
go func() {
// 执行连接
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
...
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
// 执行握手并尝试将连接方作为一个peer
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
self := srv.Self()
if self == nil {
return errors.New("shutdown")
}
// 创建conn对象
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
}
return err
}
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
if !running {
return errServerStopped
}
// Run the encryption handshake.
var err error
// 握手协议的RLPX编码
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// For dialed connections, check that the remote public key matches.
// 连接握手的ID和对应的ID不匹配
if dialDest != nil && c.id != dialDest.ID {
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
return DiscUnexpectedIdentity
}
// 将c发送给srv.posthandshake
err = srv.checkpoint(c, srv.posthandshake)
if err != nil {
clog.Trace("Rejected peer before protocol handshake", "err", err)
return err
}
// Run the protocol handshake
// 运行握手协议
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
clog.Trace("Failed proto handshake", "err", err)
return err
}
if phs.ID != c.id {
clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
return DiscUnexpectedIdentity
}
c.caps, c.name = phs.Caps, phs.Name
// 将c发送给addpeer队列
err = srv.checkpoint(c, srv.addpeer)
if err != nil {
clog.Trace("Rejected peer", "err", err)
return err
}
// If the checks completed successfully, runPeer has now been
// launched by run.
clog.Trace("connection set up", "inbound", dialDest == nil)
return nil
}
在server.start()还有个run()函数,它主要用来处理主动连接外部节点的流程和已经连接的checkpoint逻辑。
func (srv *Server) run(dialstate dialer) {
defer srv.loopWG.Done()
var (
peers = make(map[discover.NodeID]*Peer)
inboundCount = 0
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
taskdone = make(chan task, maxActiveDialTasks)
runningTasks []task
queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup and cannot be
// modified while the server is running.
// 将信任节点加入map以快速检错
for _, n := range srv.TrustedNodes {
trusted[n.ID] = true
}
// removes t from runningTasks
// delTask函数用来从runningTasks删除一个task
delTask := func(t task) {
for i := range runningTasks {
if runningTasks[i] == t {
runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
break
}
}
}
// starts until max number of active tasks is satisfied
// 启动任务,直到任务数达到最大
startTasks := func(ts []task) (rest []task) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
return ts[i:]
}
scheduleTasks := func() {
// Start from queue first.
// 启动queuedTasks中的第一个
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
// Query dialer for new tasks and start as many as possible now.
// 调用newTasks来生成任务,并尝试用startTasks启动
// 把暂时无法启动的放入queuedTasks队列
if len(runningTasks) < maxActiveDialTasks {
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
queuedTasks = append(queuedTasks, startTasks(nt)...)
}
}
running:
for {
scheduleTasks()
select {
case <-srv.quit:
// The server was stopped. Run the cleanup logic.
// 服务器停止
break running
case n := <-srv.addstatic:
// This channel is used by AddPeer to add to the
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
// 添加静态节点
srv.log.Trace("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
// 移除静态节点
srv.log.Trace("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
}
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
srv.peerOpDone <- struct{}{}
case t := <-taskdone:
// A task got done. Tell dialstate about it so it
// can update its state and remove it from the active
// tasks list.
srv.log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
// 之前调用checkpoint()会把连接发送到这
if trusted[c.id] {
// Ensure that the trusted flag is set before checking against MaxPeers.
// 确保在检查MaxPeers之前设置了可信标志。
c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
case <-srv.quit:
break running
}
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
// 已经通过握手协议
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
// The handshakes are done and it passed all checks.
// 节点通过验证
p := newPeer(c, srv.Protocols)
// If message events are enabled, pass the peerFeed
// to the peer
// 如果启用了消息事件,请将peerFeed传递给peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
// 添加p2p节点
go srv.runPeer(p)
peers[c.id] = p
if p.Inbound() {
inboundCount++
}
}
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
select {
case c.cont <- err:
case <-srv.quit:
break running
}
case pd := <-srv.delpeer:
// A peer disconnected.
// 删除peer
d := common.PrettyDuration(mclock.Now() - pd.created)
pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
delete(peers, pd.ID())
if pd.Inbound() {
inboundCount--
}
}
}
srv.log.Trace("P2P networking is spinning down")
// Terminate discovery. If there is a running lookup it will terminate soon.
// 停止发现节点
if srv.ntab != nil {
srv.ntab.Close()
}
if srv.DiscV5 != nil {
srv.DiscV5.Close()
}
// Disconnect all peers.
// 断开与所有peer的连接
for _, p := range peers {
p.Disconnect(DiscQuitting)
}
// Wait for peers to shut down. Pending connections and tasks are
// not handled here and will terminate soon-ish because srv.quit
// is closed.
// 等待peer关闭
for len(peers) > 0 {
p := <-srv.delpeer
p.log.Trace("<-delpeer (spindown)", "remainingTasks", len(runningTasks))
delete(peers, p.ID())
}
}
...
// runPeer runs in its own goroutine for each peer.
// it waits until the Peer logic returns and removes
// the peer.
func (srv *Server) runPeer(p *Peer) {
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
// broadcast peer add
// 广播节点添加
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeAdd,
Peer: p.ID(),
})
// run the protocol
// 运行协议
remoteRequested, err := p.run()
// broadcast peer drop
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeDrop,
Peer: p.ID(),
Error: err.Error(),
})
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
srv.delpeer <- peerDrop{p, err, remoteRequested}
}
总结一下,当server启动后,它会通过listenloop()来接收和处理外部请求,通过run来主动发起与外部其他节点的连接。
Node
所以,接着就看看node的结构。
// Node represents a host on the network.
// The fields of Node may not be modified.
// 节点表示网络上的主机
type Node struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP, TCP uint16 // port numbers
ID NodeID // the node's public key
// This is a cached copy of sha3(ID) which is used for node
// distance calculations. This is part of Node in order to make it
// possible to write tests that need a node at a certain distance.
// In those tests, the content of sha will not actually correspond
// with ID.
sha common.Hash
// Time when the node was added to the table.
addedAt time.Time
}
连接中的远程节点peer的结构是:
// Peer represents a connected remote node.
// 连接的远程节点
type Peer struct {
rw *conn
running map[string]*protoRW
log log.Logger
created mclock.AbsTime
wg sync.WaitGroup
protoErr chan error
closed chan struct{}
disc chan DiscReason
// events receives message send / receive events if set
events *event.Feed
}
在p2p网络中,每一个节点既是服务端又是客户端。devp2p主要有两个功能,一个是用来发现当前节点之外的其他节点,一个是实现两个节点间的信息通讯。
节点发现
准备知识
对于分布式系统,怎么去高效地发现节点是一个至关重要的问题。一般的分布式系统都是使用DHT(Distributed Hash Table,分布式哈希表)原理来发现和管理节点。其中应用最广的又当属Kademlia算法。
这里且通过我认为比较好的两篇博文来理解DHT原理和Kademlia算法。
其中,《易懂分布式 | Kademlia算法》一文可以着重看看,它将是接下来看发现节点源码的基础,这也是我看过的关于Kademlia最通俗易懂的博文。
这里也将Kademlia算法的论文原文以及中文翻译版贴出:
table(k-bucket in Kademlia)
这里的table结构就相当于Kademlia算法中的k-bucket(k桶),用来存储一个节点附近的某些节点路由。
const (
// 并发因子
alpha = 3 // Kademlia concurrency factor
// k桶容量
bucketSize = 16 // Kademlia bucket size
// 每个k桶更换清单的大小
maxReplacements = 10 // Size of per-bucket replacement list
// We keep buckets for the upper 1/15 of distances because
// it's very unlikely we'll ever encounter a node that's closer.
//
hashBits = len(common.Hash{}) * 8
nBuckets = hashBits / 15 // Number of buckets
// 最近k桶的距离
bucketMinDistance = hashBits - nBuckets // Log distance of closest bucket
// IP address limits.
bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
tableIPLimit, tableSubnet = 10, 24
// 节点丢弃的限制,超过这些限制的节点将被丢弃
maxFindnodeFailures = 5 // Nodes exceeding this limit are dropped
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
seedMinTableTime = 5 * time.Minute
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
)
//k-bucket表
type Table struct {
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
// k桶数组
buckets [nBuckets]*bucket // index of known nodes by distance
// 引导节点
nursery []*Node // bootstrap nodes
// 随机源头
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
// 节点数据库
db *nodeDB // database of known nodes
refreshReq chan chan struct{}
initDone chan struct{}
closeReq chan struct{}
closed chan struct{}
nodeAddedHook func(*Node) // for testing
net transport
self *Node // metadata of the local node
}
...
// bucket contains nodes, ordered by their last activity. the entry
// that was most recently active is the first element in entries.
// 按上一个活动排序的节点
// 最近活动的条目是条目中的第一个元素
type bucket struct {
// 实时节点列表
entries []*Node // live entries, sorted by time of last contact
// 候补节点,entries满了后之后的节点会存储到这
replacements []*Node // recently seen nodes to be used if revalidation fails
ips netutil.DistinctNetSet
}
那么怎么初始化一个k桶表呢?
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) {
// If no node database was given, use an in-memory one
// 1.新建节点数据库
db, err := newNodeDB(nodeDBPath, nodeDBVersion, ourID)
if err != nil {
return nil, err
}
// 2.构造k桶表
tab := &Table{
net: t,
db: db,
self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
}
// 3.设置初始连接点
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
// 4.填充k桶
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
// 4.加载种子节点
tab.seedRand()
tab.loadSeedNodes()
// Start the background expiration goroutine after loading seeds so that the search for
// seed nodes also considers older nodes that would otherwise be removed by the
// expiration.
tab.db.ensureExpirer()
// 5.开启协程循环更新
go tab.loop()
return tab, nil
}
...
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
revalidateDone = make(chan struct{})
refreshDone = make(chan struct{}) // where doRefresh reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
// 初始化刷新
go tab.doRefresh(refreshDone)
loop:
for {
select {
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq:
// 将刷新请求加入通道数组
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case <-refreshDone:
// 刷新完毕,关闭通道
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
case <-revalidate.C:
// 重新验证
go tab.doRevalidate(revalidateDone)
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
case <-copyNodes.C:
// 拷贝活跃节点
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
}
}
if tab.net != nil {
tab.net.close()
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
tab.db.close()
close(tab.closed)
}
接着继续看看loop里的几个函数。
// Lookup performs a network search for nodes close
// to the given target. It approaches the target by querying
// nodes that are closer to it on each iteration.
// The given target does not need to be an actual node
// identifier.
// 搜索当前节点的附近节点
func (tab *Table) Lookup(targetID NodeID) []*Node {
return tab.lookup(targetID, true)
}
func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool)
seen = make(map[NodeID]bool)
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
// 一般询问自己是没有意义的
asked[tab.self.ID] = true
for {
tab.mutex.Lock()
// generate initial result set
// 获取k桶表中给定节点id的n个节点
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
if len(result.entries) > 0 || !refreshIfEmpty {
// k桶表不为空
break
}
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
// k桶表为空
<-tab.refresh()
refreshIfEmpty = false
}
for {
// ask the alpha closest nodes that we haven't asked yet
// 询问尚未询问的最接近的节点
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
// 未被查询的节点
if !asked[n.ID] {
asked[n.ID] = true
pendingQueries++
// 查找节点
go tab.findnode(n, targetID, reply)
}
}
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
// 所有最近节点都询问过了
break
}
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID] {
seen[n.ID] = true
result.push(n, bucketSize)
}
}
pendingQueries--
}
return result.entries
}
...
func (tab *Table) findnode(n *Node, targetID NodeID, reply chan<- []*Node) {
fails := tab.db.findFails(n.ID)
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil || len(r) == 0 {
fails++
tab.db.updateFindFails(n.ID, fails)
log.Trace("Findnode failed", "id", n.ID, "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
tab.db.updateFindFails(n.ID, fails-1)
}
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
// 将所有节点加入k桶表,尽管有些已经离线(我们会在重新验证时删除这些节点)
for _, n := range r {
tab.add(n)
}
reply <- r
}
...
// nodesByDistance is a list of nodes, ordered by
// distance to target.
// 根据距离远近排序的邻居节点
type nodesByDistance struct {
entries []*Node
target common.Hash
}
// push adds the given node to the list, keeping the total size below maxElems.
// 将节点添加到给定列表并维持大小小于maxElems
//
func (h *nodesByDistance) push(n *Node, maxElems int) {
// 根据节点到target的距离排序
ix := sort.Search(len(h.entries), func(i int) bool {
return distcmp(h.target, h.entries[i].sha, n.sha) > 0
})
// 大小未超出限制,将节点加入
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
}
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
// slide existing entries down to make room
// this will overwrite the entry we just appended.
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
}
}
现在只看了刷新节点的方法doRefresh(),查找到的节点还需要进行二次验证来确保每一个节点都在线。
// doRevalidate checks that the last node in a random bucket is still live
// and replaces or deletes the node if it isn't.
// 检查最后一个节点是否在线,不在线就更换或删除
func (tab *Table) doRevalidate(done chan<- struct{}) {
defer func() { done <- struct{}{} }()
last, bi := tab.nodeToRevalidate()
if last == nil {
// No non-empty bucket found.
return
}
// Ping the selected node and wait for a pong.
// 发出ping指令,等待pong回复
err := tab.net.ping(last.ID, last.addr())
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.buckets[bi]
if err == nil {
// The node responded, move it to the front.
log.Trace("Revalidated node", "b", bi, "id", last.ID)
b.bump(last)
return
}
// No reply received, pick a replacement or delete the node if there aren't
// any replacements.
// 没有事收到ping的回复,更换或删除节点
if r := tab.replace(b, last); r != nil {
log.Trace("Replaced dead node", "b", bi, "id", last.ID, "ip", last.IP, "r", r.ID, "rip", r.IP)
} else {
log.Trace("Removed dead node", "b", bi, "id", last.ID, "ip", last.IP)
}
}
// nodeToRevalidate returns the last node in a random, non-empty bucket.
// 获取随机非空k桶的最后一个节点
func (tab *Table) nodeToRevalidate() (n *Node, bi int) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
for _, bi = range tab.rand.Perm(len(tab.buckets)) {
b := tab.buckets[bi]
if len(b.entries) > 0 {
last := b.entries[len(b.entries)-1]
return last, bi
}
}
return nil, 0
}
...
// bump moves the given node to the front of the bucket entry list
// if it is contained in that list.
// 将节点node移动到最前
func (b *bucket) bump(n *Node) bool {
for i := range b.entries {
if b.entries[i].ID == n.ID {
// move it to the front
copy(b.entries[1:], b.entries[:i])
b.entries[0] = n
return true
}
}
return false
}
...
// replace removes n from the replacement list and replaces 'last' with it if it is the
// last entry in the bucket. If 'last' isn't the last entry, it has either been replaced
// with someone else or became active.
// 这里验证失败后会把前面buket结构里replacements里的节点候补到entries
func (tab *Table) replace(b *bucket, last *Node) *Node {
if len(b.entries) == 0 || b.entries[len(b.entries)-1].ID != last.ID {
// Entry has moved, don't replace it.
return nil
}
// Still the last entry.
if len(b.replacements) == 0 {
tab.deleteInBucket(b, last)
return nil
}
r := b.replacements[tab.rand.Intn(len(b.replacements))]
b.replacements = deleteNode(b.replacements, r)
b.entries[len(b.entries)-1] = r
tab.removeIP(b, last.IP)
return r
}
综上,table主要实现了用于p2p的Kademlia协议。主要通过loop函数来进行k桶表的构造,大概的流程是:
1.loadSeedNodes方法加载种子节点,通过这些节点来发现其他网络节点。
2.loop循环当收到刷新请求时,会调lookup方法去发现当前节点的种子节点
3.按照Kademlia核心思想,递归地调用findnode方法询问所有最近节点直到所有节点都被询问过
4.当收到一个节点replay时调用push方法将节点加入到nodesByDistance结构的数组,并按照各邻居节点到当前节点的顺序排序
5.然后还会通过doRevalidate方法发出ping指令对搜索到的bucket中的节点进行验证,以此来剔除不在线的但已添加到entries列表的节点
上面涉及到的ping()方法以及最终findnode方法都是通过transport接口实现的。不难发现udp这个类正好实现了这个接口,还有kademlia协议本身就是使用UDP来进行网络通信的。
// transport is implemented by the UDP transport.
// it is an interface so we can test without opening lots of UDP
// sockets and without generating a private key.
// 真正寻找节点的接口
type transport interface {
ping(NodeID, *net.UDPAddr) error
findnode(toid NodeID, addr *net.UDPAddr, target NodeID) ([]*Node, error)
close()
}
udp
进入udp.go,可以看到其中定义了4种数据报结构:
// RPC packet types
const (
// ping包 用于询问节点是否在线
pingPacket = iota + 1 // zero is 'reserved'
//pong包 是对ping的回复
pongPacket
// 查找邻居节点
findnodePacket
// 返回另据节点 是对findnodePacket的回复
neighborsPacket
)
// RPC request structures
type (
ping struct {
Version uint
From, To rpcEndpoint
Expiration uint64
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// pong is the reply to ping.
pong struct {
// This field should mirror the UDP envelope address
// of the ping packet, which provides a way to discover the
// the external address (after NAT).
To rpcEndpoint
ReplyTok []byte // This contains the hash of the ping packet.
Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// findnode is a query for nodes close to the given target.
findnode struct {
Target NodeID // doesn't need to be an actual public key
Expiration uint64
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// reply to findnode
neighbors struct {
Nodes []rpcNode
Expiration uint64
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
rpcNode struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP uint16 // for discovery protocol
TCP uint16 // for RLPx protocol
ID NodeID
}
rpcEndpoint struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP uint16 // for discovery protocol
TCP uint16 // for RLPx protocol
}
)
接着来看一下udp的数据结构。
// conn接口实现了udp通讯必要的方法
type conn interface {
ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
Close() error
LocalAddr() net.Addr
}
// udp implements the RPC protocol.
type udp struct {
// conn接口实现了udp通讯必要的方法
conn conn
netrestrict *netutil.Netlist
// 节点私钥
priv *ecdsa.PrivateKey
ourEndpoint rpcEndpoint
// 待处理的回应,存储发出一个消息后应当得到的回应
addpending chan *pending
// 处理回应,收到回复消息后会遍历pending列表查找匹配的回应
gotreply chan reply
closing chan struct{}
nat nat.Interface
// 匿名对象,可以直接调用Table的方法
*Table
}
// pending represents a pending reply.
//
// some implementations of the protocol wish to send more than one
// reply packet to findnode. in general, any neighbors packet cannot
// be matched up with a specific findnode packet.
//
// our implementation handles this by storing a callback function for
// each pending reply. incoming packets from a node are dispatched
// to all the callback functions for that node.
// 待处理的回复
//
type pending struct {
// these fields must match in the reply.
from NodeID
ptype byte
// time when the request must complete
// 请求消息的截止时间
deadline time.Time
// callback is called when a matching reply arrives. if it returns
// true, the callback is removed from the pending reply queue.
// if it returns false, the reply is considered incomplete and
// the callback will be invoked again for the next matching reply.
// 匹配的回复消息到达时调用回调
// true-从挂起的回复队列中删除回调
// false-回复不完整,并且将再次调用回调以进行下一次匹配
callback func(resp interface{}) (done bool)
// errc receives nil when the callback indicates completion or an
// error if no further reply is received within the timeout.
errc chan<- error
}
type reply struct {
from NodeID
ptype byte
data interface{}
// loop indicates whether there was
// a matching request by sending on this channel.
matched chan<- bool
}
接着,我们看看在table里调用的ping()是怎么实现的。
// ping sends a ping message to the given node and waits for a reply.
// 发送ping消息并等待回复
func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
return <-t.sendPing(toid, toaddr, nil)
}
// sendPing sends a ping message to the given node and invokes the callback
// when the reply arrives.
// 向给定节点发送ping,并在收到pang回应时调用回调
func (t *udp) sendPing(toid NodeID, toaddr *net.UDPAddr, callback func()) <-chan error {
req := &ping{
Version: 4,
From: t.ourEndpoint,
To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
Expiration: uint64(time.Now().Add(expiration).Unix()),
}
packet, hash, err := encodePacket(t.priv, pingPacket, req)
if err != nil {
errc := make(chan error, 1)
errc <- err
return errc
}
errc := t.pending(toid, pongPacket, func(p interface{}) bool {
ok := bytes.Equal(p.(*pong).ReplyTok, hash)
if ok && callback != nil {
callback()
}
return ok
})
t.write(toaddr, req.name(), packet)
return errc
}
然后,顺便看下查找节点的消息发送机制。
// findnode sends a findnode request to the given node and waits until
// the node has sent up to k neighbors.
// 发起一个findnode请求并等待邻居节点的回应
func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
// If we haven't seen a ping from the destination node for a while, it won't remember
// our endpoint proof and reject findnode. Solicit a ping first.
// 如果没有收到目标节点的ping,先发起ping请求
if time.Since(t.db.lastPingReceived(toid)) > nodeDBNodeExpiration {
t.ping(toid, toaddr)
t.waitping(toid)
}
nodes := make([]*Node, 0, bucketSize)
nreceived := 0
errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
reply := r.(*neighbors)
for _, rn := range reply.Nodes {
nreceived++
n, err := t.nodeFromRPC(toaddr, rn)
if err != nil {
log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
continue
}
nodes = append(nodes, n)
}
return nreceived >= bucketSize
})
// 发送findnodePacket消息
t.send(toaddr, findnodePacket, &findnode{
Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
return nodes, <-errc
}
...
// pending adds a reply callback to the pending reply queue.
// see the documentation of type pending for a detailed explanation.
// 将回复回调添加到挂起的回复队列
func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error {
ch := make(chan error, 1)
p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
select {
case t.addpending <- p:
// loop will handle it
case <-t.closing:
ch <- errClosed
}
return ch
}
...
func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) ([]byte, error) {
packet, hash, err := encodePacket(t.priv, ptype, req)
if err != nil {
return hash, err
}
return hash, t.write(toaddr, req.name(), packet)
}
那么当一个节点收到这些消息时,又是怎么处理的呢?根据代码发现四种消息包对应了四种处理方式,可见这里一定定义了一个接口来实现对收到的消息包的处理。
type packet interface {
handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
name() string
}
...
// handle ping
func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
t.send(from, pongPacket, &pong{
To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
t.handleReply(fromID, pingPacket, req)
// Add the node to the table. Before doing so, ensure that we have a recent enough pong
// recorded in the database so their findnode requests will be accepted later.
// 组装节点信息
n := NewNode(fromID, from.IP, uint16(from.Port), req.From.TCP)
// 判断上次收到pang消息的时间间隔是否超过限制
if time.Since(t.db.lastPongReceived(fromID)) > nodeDBNodeExpiration {
t.sendPing(fromID, from, func() { t.addThroughPing(n) })
} else {
t.addThroughPing(n)
}
t.db.updateLastPingReceived(fromID, time.Now())
return nil
}
func (req *ping) name() string { return "PING/v4" }
// handle pong
func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
// 没有找到匹配的ping请求
if !t.handleReply(fromID, pongPacket, req) {
return errUnsolicitedReply
}
t.db.updateLastPongReceived(fromID, time.Now())
return nil
}
func (req *pong) name() string { return "PONG/v4" }
// handle findnode
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
// 请求节点是否绑定
if !t.db.hasBond(fromID) {
// No endpoint proof pong exists, we don't process the packet. This prevents an
// attack vector where the discovery protocol could be used to amplify traffic in a
// DDOS attack. A malicious actor would send a findnode request with the IP address
// and UDP port of the target as the source address. The recipient of the findnode
// packet would then send a neighbors packet (which is a much bigger packet than
// findnode) to the victim.
return errUnknownNode
}
target := crypto.Keccak256Hash(req.Target[:])
t.mutex.Lock()
closest := t.closest(target, bucketSize).entries
t.mutex.Unlock()
p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
var sent bool
// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the 1280 byte limit.
// 以块为单位发送最多maxNeighbors个块,以保持1280bytes的传输限制
for _, n := range closest {
if netutil.CheckRelayIP(from.IP, n.IP) == nil {
p.Nodes = append(p.Nodes, nodeToRPC(n))
}
if len(p.Nodes) == maxNeighbors {
t.send(from, neighborsPacket, &p)
p.Nodes = p.Nodes[:0]
sent = true
}
}
if len(p.Nodes) > 0 || !sent {
t.send(from, neighborsPacket, &p)
}
return nil
}
func (req *findnode) name() string { return "FINDNODE/v4" }
// handle neighbors
func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
if !t.handleReply(fromID, neighborsPacket, req) {
return errUnsolicitedReply
}
return nil
}
还有就是网络通信中发送的消息包都是经过RLPx编码后发送的。
func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet, hash []byte, err error) {
b := new(bytes.Buffer)
b.Write(headSpace)
b.WriteByte(ptype)
if err := rlp.Encode(b, req); err != nil {
log.Error("Can't encode discv4 packet", "err", err)
return nil, nil, err
}
packet = b.Bytes()
sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv)
if err != nil {
log.Error("Can't sign discv4 packet", "err", err)
return nil, nil, err
}
copy(packet[macSize:], sig)
// add the hash to the front. Note: this doesn't protect the
// packet in any way. Our public key will be part of this hash in
// The future.
hash = crypto.Keccak256(packet[macSize:])
copy(packet, hash)
return packet, hash, nil
}
...
func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
if len(buf) < headSize+1 {
return nil, NodeID{}, nil, errPacketTooSmall
}
hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
shouldhash := crypto.Keccak256(buf[macSize:])
if !bytes.Equal(hash, shouldhash) {
return nil, NodeID{}, nil, errBadHash
}
fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig)
if err != nil {
return nil, NodeID{}, hash, err
}
var req packet
switch ptype := sigdata[0]; ptype {
case pingPacket:
req = new(ping)
case pongPacket:
req = new(pong)
case findnodePacket:
req = new(findnode)
case neighborsPacket:
req = new(neighbors)
default:
return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
}
s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0)
err = s.Decode(req)
return req, fromID, hash, err
}
今天基本将p2p中节点发现的源码看了,明天接着看剩下的源码。
更多以太坊源码解析请移驾全球最大同性交友网,觉得有用记得给个小star哦
.
.
.
.
互联网颠覆世界,区块链颠覆互联网!
--------------------------------------------------20181103 17:58