动机
最近需要将流程的耗时接入可观测系统,方便后续优化性能。为了统一技术栈,决定使用prometheus
来接入。特别的是,我需要使用自定义的时间戳,由于PushGateway
不支持自定义的时间戳,只能使用 pull 的方式了。
整体流程
1、构建 Collector
。
2、注册到 Register
(也实现了 Gatherer
接口),其中会调用 Collector
的 Describe
方法。
3、通过http请求到 /metrics
,使用 promhttp.Handler
来处理请求。
4、调用 Register
的 Gather
方法,其中会调用 注册的Collector
的Collect
方法。
5、将 Metric
按名称分组为 dto.MetricFamily
,编码后写入 ResponseWriter
,返回给请求端。
实现细节
实现 Collector
type timestampedCollector struct {
bufferChan chan prometheus.Metric
desc *prometheus.Desc
}
func newTimestampedCollector(bufferSize int, desc *prometheus.Desc) *timestampedCollector {
metrics := make(chan prometheus.Metric, bufferSize)
return ×tampedCollector{
bufferChan: metrics,
desc: desc,
}
}
func (t *timestampedCollector) add(m prometheus.Metric) {
logger.Infof("add metric:%s", util.ToJson(m.Desc()))
t.bufferChan <- m
}
func (t *timestampedCollector) Describe(c chan<- *prometheus.Desc) {
c <- t.desc
}
func (t *timestampedCollector) Collect(c chan<- prometheus.Metric) {
for {
select {
case m := <-t.bufferChan:
c <- m
case <-time.After(time.Second * 1):
return
}
}
}
prometheus中有2种Collector
,一种是uncheckedCollectors
,还有一种是checkedCollectors
,就是看你实现的Collector
的Describe
方法中有没有添加prometheus.Desc
注册到Registry
构建prometheus.Desc
var totalDesc = prometheus.NewDesc("emr_exclude_bootstrap_duration_seconds",
"流程去掉执行引导操作的时间",
[]string{"service", "region", "nodeCnt", "flowId"},
nil)
NewDesc
生成 prometheus.Desc
的id,总的来说就是(fqName + constLabels的值列表) 合并起来的string的hash值
func (v2) NewDesc(fqName, help string, variableLabels ConstrainableLabels, constLabels Labels) *Desc {
// 构建 Desc
......
labelValues := make([]string, 1, len(constLabels)+1)
labelValues[0] = fqName
for _, labelName := range labelNames {
labelValues = append(labelValues, constLabels[labelName])
}
xxh := xxhash.New()
for _, val := range labelValues {
xxh.WriteString(val)
xxh.Write(separatorByteSlice)
}
d.id = xxh.Sum64() // 生成 Desc 的 id
}
NewDesc
生成 prometheus.Desc
的dimHash,总的来说就是(fqName + constLabels的值列表) 合并起来的string的hash值
func (v2) NewDesc(fqName, help string, variableLabels ConstrainableLabels, constLabels Labels) *Desc {
// 构建 Desc
......
labelNames := make([]string, 0, len(constLabels)+len(d.variableLabels.names))
for labelName := range constLabels {
labelNames = append(labelNames, labelName)
}
for _, label := range variableLabels {
labelNames = append(labelNames, "$"+label)
}
xxh := xxhash.New()
xxh.WriteString(help)
xxh.Write(separatorByteSlice)
for _, labelName := range labelNames {
xxh.WriteString(labelName)
xxh.Write(separatorByteSlice)
}
d.dimHash = xxh.Sum64()
}
注册到prometheus
totalCollector = newTimestampedCollector(1000, totalDesc)
if err := handleErr(prometheus.Register(totalCollector)); err != nil {
panic(err)
}
伪代码如下
func (r *Registry) Register(c Collector) error {
c.Describe(descChan)
close(descChan)
for desc := range descChan {
1、desc.id不能重复
2、desc.dimHash + desc.fqName 不能重复
}
}
使用 promhttp.Handler
来处理请求
func main() {
http.Handle("/metrics", promhttp.Handler())
// 暴露自己的指标
http.ListenAndServe(":13000", nil)
}
使用curl -XGET 'http://localhost:13000/metrics'
来验证metric是否暴露成功。其中 http handler的链表结构为:
InstrumentHandlerCounter -- HandlerForTransactional内部方法
暴露指标
promhttp.Handler
最核心的就是调用 Registry
的 Gather
方法,其中会调用注册的Collector
的Collect
方法来将指标拉取出来,接着主要是2步
1、聚合
MetricFamily
按照Metric fqName分组,相同Metric fqName的Metric
会放入到MetricFamily
的集合中
2、校验
1、指标名称 + lable的名字 + label的值 + 时间戳 不能重复
2、label不能重复