深入探究ZIPKIN调用链跟踪——拓扑Dependencies篇

前言: 对于调用链跟踪,如果我们想从服务层面,了解服务之间的调用情况,包括服务之间调用次数,异常次数等。那么就需要一个链路拓扑展示功能。链路拓扑图可以帮助我们快速了解所有服务之间的调用情况:谁调用谁,总调用量多少,正常调用以及异常调用量情况。

快速开始

Zipkin的拓扑服务zipkin-dependencies是作为zipkin的一个独立的离线服务,也就是说,只启动zipkin服务,是没法看到拓扑的,还需要自己离线启动zipkin-dependencues服务。

$ STORAGE_TYPE=elasticsearch ES_HOSTS=host1,host2 java -jar zipkin-dependencies.jar `date -u -d '1 day ago' +%F`
# To override the http port, add it to the host string
$ STORAGE_TYPE=elasticsearch ES_HOSTS=host1:9201 java -jar zipkin-dependencies.jar `date -u -d '1 day ago' +%F`

其中ES配置参数如下:

* `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin.
* `ES_DATE_SEPARATOR`: The separator used when generating dates in index.
                       Defaults to '-' so the queried index look like zipkin-yyyy-DD-mm
                       Could for example be changed to '.' to give zipkin-yyyy.MM.dd
* `ES_HOSTS`: A comma separated list of elasticsearch hosts advertising http. Defaults to
              localhost. Add port section if not listening on port 9200. Only one of these hosts
              needs to be available to fetch the remaining nodes in the cluster. It is
              recommended to set this to all the master nodes of the cluster. Use url format for
              SSL. For example, "https://yourhost:8888"
* `ES_NODES_WAN_ONLY`: Set to true to only use the values set in ES_HOSTS, for example if your
                       elasticsearch cluster is in Docker. Defaults to false
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch basic authentication. Use when X-Pack security
                                   (formerly Shield) is in place. By default no username or
                                   password is provided to elasticsearch.

Zipkin出了支持elasticsearch存储,还有mysql,cassard,详细配置信息请看源码Readme

效果展示

l链路拓扑

1、图中线条说明

服务之间的线条,遵循以下原则:

  • 越细调用越少,越粗调用越多
  • 越黑错误越少,越红错误越多

2、主调被调次数说明

点开每一个服务,可以看到主调被调,比如我在拓扑图中点击
某个服务,可以与此服务有直接调用关系的服务有哪些,效果如下:


主调被调服务

其中Uses by表示此服务作为被调服务,被哪些服务调用了;Uses表示此服务调用了哪些其他服务。

在上面的图中点击某个主调或被调服务,即可看到具体的调用次数,以及失败次数,效果如下:


调用次数展示

通过拓扑图,宏观上,我们可以快速了解服务之间的调用关系,同时也可以知道哪些服务间调用有问题,且可以知道出现问题的一个量级是多少(失败数,调用总数)。

原理分析

链路拓扑构建原理

Zipkin拓扑denpendencies是基于上报的链路span数据再次构建出的描述链路拓扑的一种新的数据结构。

构建链路的第一步就是读取Span数据。Zipkin外部数据源支持三种,分别是Mysql,Cassandra,Elasticsearch,因此构建拓扑时,将从这三种数据源中读取Span数据。

读取Span数据源后,需要对其处理,计算出链路的拓扑。因为Span的数据量很大,普通程序计算处理无法完成任务,因此需要用到大数据框架。Zipkin官方选用的是Spark框架。Spark对Span数据进行处理,最后生成拓扑数据DenpendencyLink,然后持久化到存储中。

前端请求拓扑(DependencyLink)时,即按照查询条件,查询已经持久化后的DependencyLink,然后经过UI渲染,进行页面展示。

服务主逻辑

启动Zipkin-dependencies服务时,会传入几个参数,分别是时间day和存储类型storageType。Zipkin-dependencies服务是以天为单位进行建立拓扑,因此day将决定建立那一天的拓扑;而storageType将决定从什么储存中读取数据。

1、获取日期:

long day = args.length == 1 ? parseDay(args[0]) : System.currentTimeMillis();

2、获取存储类型:

String storageType = System.getenv("STORAGE_TYPE");

3、根据不同的存储启动不同的jOb:

switch (storageType) {
  case "cassandra":
    CassandraDependenciesJob.builder()
        .logInitializer(logInitializer)
        .jars(jarPath)
        .day(day)
        .build()
        .run();
    break;
  case "cassandra3":
    zipkin2.dependencies.cassandra3.CassandraDependenciesJob.builder()
        .logInitializer(logInitializer)
        .jars(jarPath)
        .day(day)
        .build()
        .run();
    break;
  case "mysql":
    MySQLDependenciesJob.builder()
        .logInitializer(logInitializer)
        .jars(jarPath)
        .day(day)
        .build()
        .run();
    break;
  case "elasticsearch":
    ElasticsearchDependenciesJob.builder()
        .logInitializer(logInitializer)
        .jars(jarPath)
        .day(day)
        .build()
        .run();
    break;
  default:
    throw new UnsupportedOperationException("Unsupported STORAGE_TYPE: " + storageType);
}

Job构建拓扑

不同的存储会定义不同Job类,因此有CassandraDependenciesJob,MySQLDependenciesJob,MySQLDependenciesJob,ElasticsearchDependenciesJob。 不同的Job主要区别在于读取Span的方式不同,而Spark对Span进行处理计算的方式基本都是相同的。 本文主要分析ElasticsearchJOb。

Job中主要逻辑都在run方法中,ElastichserchJob的Run方法定义如下:

void run(String spanResource, String dependencyLinkResource, SpanBytesDecoder decoder) {
    JavaSparkContext sc = new JavaSparkContext(conf);
    try {
        JavaRDD<Map<String, Object>> links =
        JavaEsSpark.esJsonRDD(sc, spanResource)
            .groupBy(JSON_TRACE_ID)
            .flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder))
            .values()
            .mapToPair(l -> Tuple2.apply(Tuple2.apply(l.parent(), l.child()), l))
            .reduceByKey((l, r) -> DependencyLink.newBuilder()
                .parent(l.parent())
                .child(l.child())
                .callCount(l.callCount() + r.callCount())
                .errorCount(l.errorCount() + r.errorCount())
                .build())
            .values()
            .map(DEPENDENCY_LINK_JSON);

        if (links.isEmpty()) {
        log.info("No dependency links could be processed from spans in index {}", spanResource);
        } else {
            JavaEsSpark.saveToEs(
            links,
            dependencyLinkResource,
            Collections.singletonMap("es.mapping.id", "id")); // allows overwriting the link
        }
    } finally {
        sc.stop();
    }
}

主要步骤如下:
1、首先通过Spark的配置属性Conf,创建一个JavaSparkContext对象sc:

JavaSparkContext sc = new JavaSparkContext(conf);

2、然后读取elasticsearch span数据源:

JavaEsSpark.esJsonRDD(sc, spanResource)

3、读取数据源后,就可以对Span进行处理了,首先按照TraceId 进行Group分组:

JavaEsSpark.esJsonRDD(sc, spanResource)
          .groupBy(JSON_TRACE_ID)

其中JSON_TRACE_ID Function定义如下:

static final Function<Tuple2<String, String>, String> JSON_TRACE_ID = new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> pair) throws IOException {
        JsonReader reader = new JsonReader(new StringReader(pair._2));
        reader.beginObject();
        while (reader.hasNext()) {
            String nextName = reader.nextName();
            if (nextName.equals("traceId")) {
                String traceId = reader.nextString();
                return traceId.length() > 16 ? traceId.substring(traceId.length() - 16) : traceId;
            } else {
                reader.skipValue();
            }
        }
        throw new MalformedJsonException("no traceId in " + pair);
    }

    @Override
    public String toString() {
        return "pair._2.traceId";
    }
};

4、Span按照TraceId Group 分组后,接着对Span进行处理, 创建出DenpendencyLink。

JavaEsSpark.esJsonRDD(sc, spanResource)
          .groupBy(JSON_TRACE_ID)
          .flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder))

5、上面方法最终返回的是个Map类型,将其转化为pari类型,再对其进行一个reduceByKey操作:

JavaEsSpark.esJsonRDD(sc, spanResource)
          .groupBy(JSON_TRACE_ID)
          .flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder))
          .values()
          .mapToPair(l -> Tuple2.apply(Tuple2.apply(l.parent(), l.child()), l))
          .reduceByKey((l, r) -> DependencyLink.newBuilder()
            .parent(l.parent())
            .child(l.child())
            .callCount(l.callCount() + r.callCount())
            .errorCount(l.errorCount() + r.errorCount())
            .build())

6、Spark对Span的计算操作到这儿基本就完成了,最后将DependencyLink转化为Jso形式:

JavaEsSpark.esJsonRDD(sc, spanResource)
    ...
    .map(DEPENDENCY_LINK_JSON);

7、对于计算好的拓扑Links,将其持久化到Elasticsearch中:

JavaEsSpark.saveToEs(
          links,
          dependencyLinkResource,
          Collections.singletonMap("es.mapping.id", "id"));

整个过程到此完毕,其中最复杂也是最核心的逻辑就是计算出链路拓扑Denpendencylink,此步骤在Function TraceIdAndJsonToDependencyLinks(logInitializer, decoder)中。接下来详细分析TraceIdAndJsonToDependencyLinks完成的工作。

DenpendencyLink

首先介绍一下DenpendencyLink数据结构。DenpendencyLink就是最终与页面交互的拓扑结构数据单元,字端有:

字端名 描述
parent 父服务名
child 子服务名
callCount 这两个服务之间的总调用次数
errorCount 这两个服务之间的总调用失败次数

DenpendencyLink类定义如下:

public final class DependencyLink implements Serializable { 
  public static Builder newBuilder() {
    return new Builder();
  }

  /** parent service name (caller) */
  public String parent() {
    return parent;
  }

  /** child service name (callee) */
  public String child() {
    return child;
  }

  /** total traced calls made from {@link #parent} to {@link #child} */
  public long callCount() {
    return callCount;
  }

  /** How many {@link #callCount calls} are known to be errors */
  public long errorCount() {
    return errorCount;
  }
}

构建DependencyLinks

TraceIdAndJsonToDependencyLinks类的定义如下:

final class TraceIdAndJsonToDependencyLinks
    implements Serializable, Function<Iterable<Tuple2<String, String>>, Iterable<DependencyLink>> {
  private static final long serialVersionUID = 0L;
  
  private static final Logger log = LoggerFactory.getLogger(TraceIdAndJsonToDependencyLinks.class);

  @Nullable final Runnable logInitializer;
  final SpanBytesDecoder decoder;

  TraceIdAndJsonToDependencyLinks(Runnable logInitializer, SpanBytesDecoder decoder) {
    this.logInitializer = logInitializer;
    this.decoder = decoder;
  }

  @Override
  public Iterable<DependencyLink> call(Iterable<Tuple2<String, String>> traceIdJson) {
    List<Span> sameTraceId = new ArrayList<>();
    for (Tuple2<String, String> row : traceIdJson) {
       decoder.decode(row._2.getBytes(ElasticsearchDependenciesJob.UTF_8), sameTraceId);

    DependencyLinker linker = new DependencyLinker();
    linker.putTrace(sameTraceId);
    return linker.link();
  }
}

其中call方法中,首先完成对同一TraceId的Span解码:

decoder.decode(row._2.getBytes(ElasticsearchDependenciesJob.UTF_8), sameTraceId);

然后,通过DependencyLinker类构造出DependendyLink,首先构造一个SpanNode Tree:

SpanNode traceTree = builder.build(spans);

然后利用深度优先遍历方法遍历整个,统计出CallCounts和errorCounts:

  void addLink(String parent, String child, boolean isError) {
    Pair key = new Pair(parent, child);
    if (callCounts.containsKey(key)) {
        callCounts.put(key, callCounts.get(key) + 1);
    } else {
        callCounts.put(key, 1L);
    }
    if (!isError) return;
    if (errorCounts.containsKey(key)) {
        errorCounts.put(key, errorCounts.get(key) + 1);
    } else {
        errorCounts.put(key, 1L);
    }
}

其中callCounts和errorCounts定义如下:

//Pair key = new Pair(parent, child);
final Map<Pair, Long> callCounts = new LinkedHashMap<>();
final Map<Pair, Long> errorCounts = new LinkedHashMap<>();

最后,再通过callCounts和errorCounts生成List<DependencyLink>:

static List<DependencyLink> link(Map<Pair, Long> callCounts,
Map<Pair, Long> errorCounts) {
    List<DependencyLink> result = new ArrayList<>(callCounts.size());
    for (Map.Entry<Pair, Long> entry : callCounts.entrySet()) {
        Pair parentChild = entry.getKey();
        result.add(DependencyLink.newBuilder()
            .parent(parentChild.left)
            .child(parentChild.right)
            .callCount(entry.getValue())
         .errorCount(errorCounts.containsKey(parentChild) ? errorCounts.get(parentChild) : 0L)
            .build());
    }
    return result;
}

这样,最终构建出了DependencyLink。

后记

本文为我的调用链系列文章之一,已有文章如下:

祝大家工作顺利,天天开心!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容