Skywalking收集与发送链路数据部分源码解析

链路收集大体逻辑

这里先不分析skywalking是如何自动收集数据的,而是说一下agent在收集后如何存储与发送给collector,这部分的架构关系到性能开销与对服务的影响

大体逻辑如下:

agent内部缓存维护了一个生产消费者,收集数据时将生产的数据按分区放到缓存中,消费者用多线程消费数据,将缓存的数据封装成grpc对象发送给collector

链路数据接收与发送

数据的接收与发送主要在类TraceSegmentServiceClient中处理
其中的一个重要属性是DataCarrier,它来实现的生产消费模式

private volatile DataCarrier<TraceSegment> carrier;

大致结构如下


DataCarrier.png

DataCarrier

属性如下:

    //一个buffer的大小
    private final int bufferSize;
    //channel的大小
    private final int channelSize;
    private Channels<T> channels;
    //消费者线程池封装
    private ConsumerPool<T> consumerPool;
    private String name;
    

方法#produce生产数据

    public boolean produce(T data) {
        if (consumerPool != null) {
            if (!consumerPool.isRunning()) {
                return false;
            }
        }

        return this.channels.save(data);
    }

channel的save方法

public boolean save(T data) {
        //计算放在channel哪个位置
        int index = dataPartitioner.partition(bufferChannels.length, data);
        //重试次数
        int retryCountDown = 1;
        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
            int maxRetryCount = dataPartitioner.maxRetryCount();
            if (maxRetryCount > 1) {
                retryCountDown = maxRetryCount;
            }
        }
        for (; retryCountDown > 0; retryCountDown--) {
            //保存成功返回true
            if (bufferChannels[index].save(data)) {
                return true;
            }
        }
        return false;
    }

进入到Buffer的save方法,TraceSegmentServiceClient用的策略是IF_POSSIBLE,缓存位置还有值直接返回,所以消费不过来会丢失部分数据

    boolean save(T data) {
        //数组位置自增
        int i = index.getAndIncrement();
        //不为空的处理
        if (buffer[i] != null) {
            switch (strategy) {
                case BLOCKING:
                    boolean isFirstTimeBlocking = true;
                    while (buffer[i] != null) {
                        if (isFirstTimeBlocking) {
                            isFirstTimeBlocking = false;
                            for (QueueBlockingCallback<T> callback : callbacks) {
                                callback.notify(data);
                            }
                        }
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    }
                    break;
                case IF_POSSIBLE:
                    return false;
                case OVERRIDE:
                default:
            }
        }
        //写入缓存
        buffer[i] = data;
        return true;
    }

DataCarrier的consume方法初始化消费者线程池

    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
        if (consumerPool != null) {
            consumerPool.close();
        }
        consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
        consumerPool.begin();
        return this;
    }

参数consumerClass就是TraceSegmentServiceClient类自己,实现具体的消费方法,初始化以后线程池就启动了

consumerPool方法begin

    public void begin() {
        if (running) {
            return;
        }
        try {
            lock.lock();
            //把channel分给不同的thread
            this.allocateBuffer2Thread();
            for (ConsumerThread consumerThread : consumerThreads) {
                consumerThread.start();
            }
            running = true;
        } finally {
            lock.unlock();
        }
    }

cusumerThread的run方法

    @Override
    public void run() {
        running = true;

        while (running) {
            boolean hasData = consume();

            if (!hasData) {
                try {
                    Thread.sleep(consumeCycle);
                } catch (InterruptedException e) {
                }
            }
        }

        consume();

        consumer.onExit();
    }

最终会调到的TraceSegmentServiceClient这个消费者的consume方法,将TraceSegment转换成grpc对象发送给collector

@Override
    public void consume(List<TraceSegment> data) {
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Downstream>() {
                @Override
                public void onNext(Downstream downstream) {

                }

                @Override
                public void onError(Throwable throwable) {
                    status.finished();
                    if (logger.isErrorEnable()) {
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
                    }
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
                }

                @Override
                public void onCompleted() {
                    status.finished();
                }
            });

            for (TraceSegment segment : data) {
                try {
                    UpstreamSegment upstreamSegment = segment.transform();
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
                } catch (Throwable t) {
                    logger.error(t, "Transform and send UpstreamSegment to collector fail.");
                }
            }
            upstreamSegmentStreamObserver.onCompleted();

            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }

        printUplinkStatus();
    }

channel

channel中包含一个Buffer数组:

    //Buffer数组
    private final Buffer<T>[] bufferChannels;
    //数据分区策略
    private IDataPartitioner<T> dataPartitioner;
    //Buffer策略
    private BufferStrategy strategy;

Buffer对象

    public class Buffer<T> {
    //对象数组
    private final Object[] buffer;
    private BufferStrategy strategy;
    //位置标记
    private AtomicRangeInteger index;
    private List<QueueBlockingCallback<T>> callbacks;
    ... 
    

抽样服务SamplingService

作用是对TraceSegment进行抽样,链路跟踪必须要考虑的功能,服务压力大时全量收集会占用cpu、内存、网络等资源

agent通过agent.config配置档中的agent.sample_n_per_3_secs设置每三秒收集的TraceSegment的个数,大于0为开启状态,默认全量收集

初始化一个3秒的定时任务

    @Override
    public void boot() throws Throwable {
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
        //大于0开启抽样
        if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
            on = true;
            this.resetSamplingFactor();
            ScheduledExecutorService service = Executors
                .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
            //定时任务
            scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
                @Override
                public void run() {
                    resetSamplingFactor();
                }
            }, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, 3, TimeUnit.SECONDS);
            logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
        }
    }

抽样逻辑

    public boolean trySampling() {
        if (on) {
            int factor = samplingFactorHolder.get();
            if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
                boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
                return success;
            } else {
                return false;
            }
        }
        return true;
    }

skywalking的agent不支持按比例抽样,比较遗憾

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容