一般情况无法知道etcd是否正常,需要自行监控
var (
dbcli *clientv3.Client
WatchClose bool
)
var etcdDialConfig = clientv3.Config{
Endpoints: []string{IP + ":2379"},
DialTimeout: 5 * time.Second,
DialKeepAliveTime: 5 * time.Second,
DialKeepAliveTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
func newClient(config clientv3.Config) error {
rlogger.FuncEntry(types.ModuleOamCm, nil)
db, err := clientv3.New(config)
if err != nil {
rlogger.Trace(types.ModuleOamCm, rlogger.ERROR, nil, "init etcd fail with", err)
} else {
dbcli = db
rlogger.Trace(types.ModuleOamCm, rlogger.INFO, nil, "init etcd success")
WatchClose = true
}
return err
}
func healthCheck() {
for {
rlogger.FuncEntry(types.ModuleOamCm, nil)
time.Sleep(2 * time.Second)
//_, err := dbcli.Dial(configure.CmSysConf.EtcdSerAddr + ":2379")
timeoutCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := dbcli.Status(timeoutCtx, etcdDialConfig.Endpoints[0])
if err != nil {
for {
err = newClient(etcdDialConfig)
if err == nil {
break
}
}
}
}
}
需要注意,for循环读取的watch不会更新,改为如下(当重新加载etcd后通过dbapi.WatchClose告诉协程EtcdWatch):
func EtcdWatch() {
WATCH:
dbapi.WatchClose = false
watch := dbapi.DbWatchPrefix(key)
for {
select {
case wresp, ok := <-watch:
if ok {
for _, ev := range wresp.Events {
//todo some
}
}
default:
//使用ticker会阻塞,必须加延时
time.Sleep(1 * time.Second)
if dbapi.WatchClose {
goto WATCH
}
}
}
}