ETCD 服务注册和发现

ETCD 服务发现

在微服务中各个服务都是无状态的,不会在配置中写死上下游的访问地址,所以需要有一个地方去维护各个节点的信息。
服务起来的时候会去注册中心拉取其他服务的节点信息,并且把自己的信息推送到注册中心。
当有运行的服务下线或者出现问题的时候,把自己从配置中心摘除,或者配置中心来检测服务状态(心跳、健康检查的协议)来摘除服务。

package main

import (
    "encoding/json"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "context"
    "log"
)

type Discovery struct {
    cli       *clientv3.Client
    info      *NodeInfo
    nodes     *NodesManager
}

func NewDiscovery(info *NodeInfo, conf clientv3.Config, mgr *NodesManager) (dis *Discovery, err error) {
    d := &Discovery{}
    d.info = info
    if mgr == nil {
        return nil, fmt.Errorf("[Discovery] mgr == nil")
    }
    d.nodes = mgr
    d.cli, err = clientv3.New(conf)
    return d, err
}

func (d *Discovery) pull() {
    kv := clientv3.NewKV(d.cli)
    resp, err := kv.Get(context.TODO(), "discovery/", clientv3.WithPrefix())
    if err != nil {
        log.Fatalf("[Discovery] kv.Get err:%+v", err)
        return
    }
    for _, v := range resp.Kvs{
        node := &NodeInfo{}
        err = json.Unmarshal(v.Value, node)
        if err != nil {
            log.Fatalf("[Discovery] json.Unmarshal err:%+v", err)
            continue
        }
        d.nodes.AddNode(node)
        log.Printf("[Discovery] pull node:%+v", node)
    }
}

func (d *Discovery) watch() {
    watcher := clientv3.NewWatcher(d.cli)
    watchChan := watcher.Watch(context.TODO(), "discovery", clientv3.WithPrefix())
    for {
        select {
        case resp := <-watchChan:
            d.watchEvent(resp.Events)
        }
    }
}

func (d *Discovery) watchEvent(evs []*clientv3.Event) {
    for _, ev := range evs{
        switch ev.Type{
        case clientv3.EventTypePut:
            node := &NodeInfo{}
            err := json.Unmarshal(ev.Kv.Value, node)
            if err != nil {
                log.Fatalf("[Discovery] json.Unmarshal err:%+v", err)
                continue
            }
            d.nodes.AddNode(node)
            log.Printf(fmt.Sprintf("[Discovery] new node:%s",string(ev.Kv.Value)))
        case clientv3.EventTypeDelete:
            d.nodes.DelNode(string(ev.Kv.Key))
            log.Printf(fmt.Sprintf("[Discovery] del node:%s data:%s",string(ev.Kv.Key),string(ev.Kv.Value)))
        }
    }
}


服务注册

在etcd中服务的注册使用租约(lease)来实现,设置租约的时候按需求设置租约的时间(ttl),类似redis中的EXPIRE key,再把服务自身的节点等信息写到对应的key中。然后定时去调用KeepAliveOnce来保持租约,如果在期间KeepAliveOnce的消息丢失或者延迟大于这个租约的ttl则etcd中将会把这个节点的信息删除,恢复正常时重新发起租约流程。

package main

import (
    "encoding/json"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
    "github.com/pkg/errors"
    "log"
    "time"
    "context"
)

const (
    _ttl = 10
)


type Register struct {
    cli       *clientv3.Client
    leaseId   clientv3.LeaseID
    lease     clientv3.Lease
    info      *NodeInfo
    closeChan chan error
}

func NewRegister(info *NodeInfo, conf clientv3.Config) (reg *Register, err error) {
    r := &Register{}
    r.closeChan = make(chan error)
    r.info = info
    r.cli, err = clientv3.New(conf)
    return r, err
}


func (r *Register) Run() {
    dur := time.Duration(time.Second)
    timer := time.NewTicker(dur)
    r.register()
    for {
        select {
        case <-timer.C:
            r.keepAlive()
        case <-r.closeChan:
            goto EXIT
        }
    }
EXIT:
    log.Printf("[Register] Run exit...")
}

func (r *Register) Stop() {
    r.revoke()
    close(r.closeChan)
}

func (r *Register) register() (err error) {
    r.leaseId = 0
    kv := clientv3.NewKV(r.cli)
    r.lease = clientv3.NewLease(r.cli)
    leaseResp, err := r.lease.Grant(context.TODO(), _ttl)
    if err != nil {
        err = errors.Wrapf(err, "[Register] register Grant err")
        return
    }
    data, err := json.Marshal(r.info)
    _, err = kv.Put(context.TODO(), r.info.UniqueId, string(data), clientv3.WithLease(leaseResp.ID))
    if err != nil {
        err = errors.Wrapf(err, "[Register] register kv.Put err %s-%+v", r.info.Name, string(data))
        return
    }
    r.leaseId = leaseResp.ID
    return
}

func (r *Register) keepAlive() (err error) {
    _, err = r.lease.KeepAliveOnce(context.TODO(), r.leaseId)
    if err != nil {
        // 租约丢失,重新注册
        if err == rpctypes.ErrLeaseNotFound {
            r.register()
            err = nil
        }
        err = errors.Wrapf(err, "[Register] keepAlive err")
    }
    log.Printf(fmt.Sprintf("[Register] keepalive... leaseId:%+v", r.leaseId))
    return err
}

func(r *Register) revoke() (err error) {
    _, err = r.cli.Revoke(context.TODO(), r.leaseId)
    if err != nil {
        err = errors.Wrapf(err, "[Register] revoke err")
        return
    }
    log.Printf(fmt.Sprintf("[Register] revoke node:%+v", r.leaseId))
    return
}

节点管理

package main

import (
    "log"
    "math/rand"
    "strings"
    "sync"
)

type NodeInfo struct {
    Addr     string
    Name     string
    UniqueId string
}

type NodesManager struct {
    sync.RWMutex
    // <name,<id,node>>
    nodes map[string]map[string]*NodeInfo
}

func NewNodeManager() (m *NodesManager){
    return &NodesManager{
        nodes: map[string]map[string]*NodeInfo{},
    }
}

func (n *NodesManager) AddNode(node *NodeInfo) {
    if node == nil {
        return
    }
    n.Lock()
    defer n.Unlock()
    if _, exist := n.nodes[node.Name]; !exist {
        n.nodes[node.Name] = map[string]*NodeInfo{}
    }
    n.nodes[node.Name][node.UniqueId] = node
}

func (n *NodesManager) DelNode(id string) {
    sli := strings.Split(id,"/")
    name := sli[len(sli)-2]
    n.Lock()
    defer n.Unlock()
    if _, exist := n.nodes[name]; exist {
        delete(n.nodes[name], id)
    }
}

func (n *NodesManager) Pick(name string) *NodeInfo {
    n.RLock()
    defer n.RUnlock()
    if nodes, exist := n.nodes[name]; !exist {
        return nil
    } else {
        // 纯随机取节点
        idx := rand.Intn(len(nodes))
        for _, v := range nodes {
            if idx == 0 {
                return v
            }
            idx--
        }
    }
    return nil
}

func (n *NodesManager) Dump () {
    for k, v := range n.nodes {
        for kk, vv := range v {
            log.Printf("[NodesManager] Name:%s Id:%s Node:%+v", k, kk, vv)
        }
    }

}

测试效果

package main

import (
    "github.com/coreos/etcd/clientv3"
    "log"
    "time"
)

func main() {

    nodes := NewNodeManager()
    dis, _ := NewDiscovery(&NodeInfo{
        Name: "server name/aaaa",
        Addr: "127.0.0.1:8888",
    }, clientv3.Config{
        Endpoints:   []string{"tx.cxc233.com:8888"},
        DialTimeout: 5 * time.Second,
    }, nodes)

    reg, _ := NewRegister(&NodeInfo{
        Name: "testsvr",
        Addr: "127.0.0.1:8888",
        UniqueId: "discovery/testsvr/instance_id/aaabbbccc",
    }, clientv3.Config{
        Endpoints:   []string{"tx.cxc233.com:8888"},
        DialTimeout: 5 * time.Second,
    })

    reg2, _ := NewRegister(&NodeInfo{
        Name: "testsvr",
        Addr: "127.0.0.1:8888",
        UniqueId: "discovery/testsvr/instance_id/testqqqqq",
    }, clientv3.Config{
        Endpoints:   []string{"tx.cxc233.com:8888"},
        DialTimeout: 5 * time.Second,
    })

    go reg.Run()
    time.Sleep(time.Second * 2)
    dis.pull()
    go dis.watch()
    time.Sleep(time.Second * 1)
    go reg2.Run()
    time.Sleep(time.Second * 1)
    nodes.Dump()
    log.Printf("[Main] nodes pick:%+v",nodes.Pick("testsvr"))
    time.Sleep(time.Second * 5)
}

2020-10-18 22:36:39.102233 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:40.100365 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:40.100365 I | [Discovery] pull node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/aaabbbccc}
2020-10-18 22:36:41.100389 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:41.143304 I | [Discovery] new node:{"Addr":"127.0.0.1:8888","Name":"testsvr","UniqueId":"discovery/testsvr/instance_id/testqqqqq"}
2020-10-18 22:36:42.097662 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:42.100819 I | [NodesManager] Name:testsvr Id:discovery/testsvr/instance_id/aaabbbccc Node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/aaabbbccc}
2020-10-18 22:36:42.100819 I | [NodesManager] Name:testsvr Id:discovery/testsvr/instance_id/testqqqqq Node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/testqqqqq}
2020-10-18 22:36:42.100819 I | [Main] nodes pick:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/testqqqqq}
2020-10-18 22:36:42.107588 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:43.091650 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:43.109634 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:44.093966 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:44.108130 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:45.093075 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:45.114071 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:46.107634 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:46.114016 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:47.091863 I | [Register] keepalive... leaseId:7587849142468224212
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354