brave+kafka+zipkin+cassandra搭建分布式链路跟踪系统

摘要: 上一篇博客,介绍的是brave直接通过http发送请求到zipkin,完成最简单的rpc跟踪,适合理解zipkin的跟踪链。这次来完成生产环境真实架构的部署。

211759_mxWH_223522.png

先来浏览一下架构图(下图的scribe采用kafka替代)

首先我们完成brave+kafka+zipkin这一步先,这时候采用的存储是zipkin的内存。


下载kafka后

tar -xzf kafka_2.11-0.9.0.0.tgz

cd kafka_2.11-0.9.0.0

启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

(windows 启动 bin\windows\zookeeper-server-start.bat config\zookeeper.properties)

启动kafka服务:

bin/kafka-server-start.sh config/server.properties

(windows 启动 bin\windows\kafka-server-start.bat config\server.properties)


下载zipkin的jar包,

wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'

运行: java -jar zipkin.jar

用解压工具可以看到zipkin.jar里的BOOT-INF\classes\zipkin-server-shared.yml文件,所有的配置都在这个文件里。

改变配置里的值,可以通过java启动脚本里-D带入配置参数。

现在我们通过kafka做数据通道的话就采用如下参数:

java -DKAFKA_ZOOKEEPER=localhost:2181 -jar zipkin-server-1.19.2-exec.jar

打开ui查看 http://localhost:9411/


接着附上测试插入数据到kafka的代码。

import com.github.kristofa.brave.*;
import com.twitter.zipkin.gen.Endpoint;
import zipkin.Span;
import zipkin.reporter.AsyncReporter;
import zipkin.reporter.kafka08.KafkaSender;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BraveKafkaTest {

    private static Brave brave = null;
    private static Brave brave2 = null;

    private static void braveInit(){
//        KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").messageMaxBytes(10000).build();
        KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").build();
        AsyncReporter<Span> report = AsyncReporter.builder(sender).build();

        brave = new Brave.Builder("appserver").reporter(report).build();
        brave2 = new Brave.Builder("datacenter").reporter(report).build();
    }

    static class Task {
        String name;
        SpanId spanId;
        public Task(String name, SpanId spanId) {
            super();
            this.name = name;
            this.spanId = spanId;
        }
    }

    public static void main(String[] args) throws Exception {
        braveInit();

        final BlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10);
        Thread thread = new Thread(){
            public void run() {
                while (true) {
                    try {
                        Task task = queue.take();
                        dcHandle(task.name, task.spanId);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();

        for (int i = 0; i < 10; i++) {
            ServerRequestInterceptor serverRequestInterceptor = brave.serverRequestInterceptor();
            ServerResponseInterceptor serverResponseInterceptor = brave.serverResponseInterceptor();
            ClientRequestInterceptor clientRequestInterceptor = brave.clientRequestInterceptor();
            ClientResponseInterceptor clientResponseInterceptor = brave.clientResponseInterceptor();

            serverRequestInterceptor.handle(new ServerRequestAdapterImpl("group_data"));

            ClientRequestAdapterImpl clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_radio_list");
            clientRequestInterceptor.handle(clientRequestAdapterImpl);
            queue.offer(new Task("get_radio_list", clientRequestAdapterImpl.getSpanId()));
            Thread.sleep(10);
            clientResponseInterceptor.handle(new ClientResponseAdapterImpl());

            clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_user_list");
            clientRequestInterceptor.handle(clientRequestAdapterImpl);
            queue.offer(new Task("get_user_list", clientRequestAdapterImpl.getSpanId()));
            Thread.sleep(10);
            clientResponseInterceptor.handle(new ClientResponseAdapterImpl());

            clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_program_list");
            clientRequestInterceptor.handle(clientRequestAdapterImpl);
            queue.offer(new Task("get_program_list", clientRequestAdapterImpl.getSpanId()));
            Thread.sleep(10);
            clientResponseInterceptor.handle(new ClientResponseAdapterImpl());

            serverResponseInterceptor.handle(new ServerResponseAdapterImpl());
            Thread.sleep(10);
        }
    }

    public static void dcHandle(String spanName, SpanId spanId){
        ServerRequestInterceptor serverRequestInterceptor = brave2.serverRequestInterceptor();
        ServerResponseInterceptor serverResponseInterceptor = brave2.serverResponseInterceptor();
        serverRequestInterceptor.handle(new ServerRequestAdapterImpl(spanName, spanId));
        serverResponseInterceptor.handle(new ServerResponseAdapterImpl());
    }


    static class ServerRequestAdapterImpl implements ServerRequestAdapter {

        Random randomGenerator = new Random();
        SpanId spanId;
        String spanName;

        ServerRequestAdapterImpl(String spanName){
            this.spanName = spanName;
            long startId = randomGenerator.nextLong();
            SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();
            this.spanId = spanId;
        }

        ServerRequestAdapterImpl(String spanName, SpanId spanId){
            this.spanName = spanName;
            this.spanId = spanId;
        }

        public TraceData getTraceData() {
            if (this.spanId != null) {
                return TraceData.builder().spanId(this.spanId).build();
            }
            long startId = randomGenerator.nextLong();
            SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();
            return TraceData.builder().spanId(spanId).build();
        }

        public String getSpanName() {
            return spanName;
        }

        public Collection<KeyValueAnnotation> requestAnnotations() {
            Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
            KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
            collection.add(kv);
            return collection;
        }

    }

    static class ServerResponseAdapterImpl implements ServerResponseAdapter {

        public Collection<KeyValueAnnotation> responseAnnotations() {
            Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
            KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
            collection.add(kv);
            return collection;
        }

    }

    static class ClientRequestAdapterImpl implements ClientRequestAdapter {

        String spanName;
        SpanId spanId;

        ClientRequestAdapterImpl(String spanName){
            this.spanName = spanName;
        }

        public SpanId getSpanId() {
            return spanId;
        }

        public String getSpanName() {
            return this.spanName;
        }

        public void addSpanIdToRequest(SpanId spanId) {
            //记录传输到远程服务
            System.out.println(spanId);
            if (spanId != null) {
                this.spanId = spanId;
                System.out.println(String.format("trace_id=%s, parent_id=%s, span_id=%s", spanId.traceId, spanId.parentId, spanId.spanId));
            }
        }

        public Collection<KeyValueAnnotation> requestAnnotations() {
            Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
            KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
            collection.add(kv);
            return collection;
        }

        public Endpoint serverAddress() {
            return null;
        }

    }

    static class ClientResponseAdapterImpl implements ClientResponseAdapter {

        public Collection<KeyValueAnnotation> responseAnnotations() {
            Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
            KeyValueAnnotation kv = KeyValueAnnotation.create("radioname", "火星人1");
            collection.add(kv);
            return collection;
        }

    }
}

maven依赖:

<dependencies>
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-core</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-spancollector-http</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.auto.value</groupId>
            <artifactId>auto-value</artifactId>
            <version>1.3</version>
        </dependency>
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-spancollector-kafka</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter</groupId>
            <artifactId>zipkin-sender-kafka08</artifactId>
            <version>0.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-access</artifactId>
            <version>1.1.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.3</version>
        </dependency>

    </dependencies>

运行完成后,插入了10条跟踪信息导kafka,打开 http://localhost:9411/ 即可看到效果。

154606_vkkV_223522.png

接下来,采用cassandra存储从kafka采集过来的数据。

下载apache-cassandra-2.2.8-bin.tar.gz,解压后

启动cassandra:bin\cassandra.bat

重新运行zipkin(加入-DSTORAGE_TYPE=cassandra参数):

java -DKAFKA_ZOOKEEPER=localhost:2181 -DSTORAGE_TYPE=cassandra -jar zipkin-server-1.19.2-exec.jar

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,018评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,046评论 2 372
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,215评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,303评论 1 266
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,181评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,171评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,577评论 3 384
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,260评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,546评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,630评论 2 311
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,390评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,260评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,633评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,920评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,210评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,569评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,767评论 2 335

推荐阅读更多精彩内容