2020-02-20

RM的子服务列表

[TOC]

RMApplicationHistoryWriter
AsyncDispatcher
AdminService
RMActiveServices
–RMSecretManagerService
–ContainerAllocationExpirer
–AMLivelinessMonitor
–RMNodeLabelsManager
–RMStateStore
–RMApplicationHistoryWriter
–SystemMetricsPublisher
–NodesListManager
–ResourceScheduler
–SchedulerEventDispatcher
–NMLivelinessMonitor
–ResourceTrackerService
–ApplicationMasterService
–ClientRMService
–ApplicationMasterLauncher
–DelegationTokenRenewer
————————————————
版权声明:本文为CSDN博主「Jerry Shao」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_44630798/article/details/89005651

1 ClientRMService初始化

在ResourceManager#RMActiveServices#serviceInit()方法中进行ClientRMService的创建

ResourceManager.RMActiveServices.serviceInit(){
    clientRM = createClientRMService();
    addService(clientRM);
    rmContext.setClientRMService(clientRM);
}

再看ClientRMService的serviceInit()方法,很简单就是为设置了下客户端绑定地址(即RM的RPC ip+port)

@Override
protected void serviceInit(Configuration conf) throws Exception {
    clientBindAddress = getBindAddress(conf);
    super.serviceInit(conf);
}
InetSocketAddress getBindAddress(Configuration conf) {
    return conf.getSocketAddr(
        YarnConfiguration.RM_BIND_HOST,
        YarnConfiguration.RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_PORT);
}

2 ResourceScheduler

在ResouceManager#RMActiveServices#serviceInit()方法中进行ResourceScheduler的创建;

进入createScheduler():根据配置文件中的参数反射创建scheduler实例,默认创建出CapacityScheduler对象。

ResourceManager.RMActiveServices.serviceInit(){
    // Initialize the scheduler
    scheduler = createScheduler();
    scheduler.setRMContext(rmContext);
    addIfService(scheduler);
    rmContext.setScheduler(scheduler);
}

protected ResourceScheduler createScheduler() {
    String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
                                         YarnConfiguration.DEFAULT_RM_SCHEDULER);
    LOG.info("Using Scheduler: " + schedulerClassName);
    try {
        Class<?> schedulerClazz = Class.forName(schedulerClassName);
        if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
            return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
                                                                   this.conf);
        } else {
            throw new YarnRuntimeException("Class: " + schedulerClassName
                                           + " not instance of " + ResourceScheduler.class.getCanonicalName());
        }
    } catch (ClassNotFoundException e) {
        throw new YarnRuntimeException("Could not instantiate Scheduler: "
                                       + schedulerClassName, e);
    }
}

由于scheduler被addIfService(scheduler)了,所以进入CapacityScheduler的serviceInit()

3 CapacityScheduler.java

[图片上传失败...(image-aeae3a-1582179940006)]

//存储队列信息
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();

进入CapacityScheduler的serviceInit(),先看super.serviceInit(conf),再看initScheduler(configuration);

public void serviceInit(Configuration conf) throws Exception {
    Configuration configuration = new Configuration(conf);
    super.serviceInit(conf);
    initScheduler(configuration);
}

//AbstractYarnScheduler.java
@Override
 public void serviceInit(Configuration conf) throws Exception {
   //yarn.nm.liveness-monitor.expiry-interval-ms nm存活检测间隔,默认600s
   nmExpireInterval =
       conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
         YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
   //yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms 默认10s
   configuredMaximumAllocationWaitTime =
       conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
         YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
   //创建定期释放死亡NM上container的Timer,调度间隔为nmExpireInterval
   createReleaseCache();
   super.serviceInit(conf);
 }

private synchronized void initScheduler(Configuration configuration) throws
    IOException {
    this.conf = loadCapacitySchedulerConfiguration(configuration);
    //memory:minMem maxMem ; vcore:minVcores maxVcores
    validateConf(this.conf);
    //minimumMemory, minimumCores记录在Resource中返回
    this.minimumAllocation = this.conf.getMinimumAllocation();
    //初始化最大的资源给configuredMaxAllocation对象:会在ClusterNodeTracker.setConfiguredMaxAllocation(Resource)中使用
    //ReentrantReadWriteLock锁,然后clone resource对象,复制给configuredMaxAllocation对象
    initMaximumResourceCapability(this.conf.getMaximumAllocation());
    //创建出资源计算器对象DefaultResourceCalculator
    this.calculator = this.conf.getResourceCalculator();
    //yarn.scheduler.include-port-in-node-name 默认false
    this.usePortForNodeName = this.conf.getUsePortForNodeName();
    //调度的app列表
    this.applications = new ConcurrentHashMap<ApplicationId,SchedulerApplication<FiCaSchedulerApp>>();
    //获取节点标签管理器对象
    this.labelManager = rmContext.getNodeLabelManager();
    authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
    //初始化队列
    initializeQueues(this.conf);
    //是否开启异步资源调度 yarn.scheduler.capacity.schedule-asynchronously.enable,默认false
    scheduleAsynchronously = this.conf.getScheduleAynschronously();
    //异步调度间隔 yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms,默认5s
    asyncScheduleInterval =
        this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,DEFAULT_ASYNC_SCHEDULER_INTERVAL);
    //如果开启了异步调度,则创建异步调度线程
    if (scheduleAsynchronously) {
        asyncSchedulerThread = new AsyncScheduleThread(this);
    }

    LOG.info("Initialized CapacityScheduler with " +
             "calculator=" + getResourceCalculator().getClass() + ", " +
             "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
             "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
             "asynchronousScheduling=" + scheduleAsynchronously + ", " +
             "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
}

重点来分析下初始化队列initializeQueues(this.conf);

(1)解析配置项,构建队列树
(2)构建队列和用户/组的映射

@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
    throws IOException {
    //构建队列树,树根为root队列   
    root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,queues, queues, noop);
    labelManager.reinitializeQueueLabels(getQueueToLabels());
    LOG.info("Initialized root queue " + root);
    initializeQueueMappings();
    setQueueAcls(authorizer, queues);
}

(1) 解析配置项,构建队列树

进入parseQueue,解析队列

@Lock(CapacityScheduler.class)
static CSQueue parseQueue(
    CapacitySchedulerContext csContext,
    CapacitySchedulerConfiguration conf,
    CSQueue parent, String queueName, Map<String, CSQueue> queues,
    Map<String, CSQueue> oldQueues,
    QueueHook hook) throws IOException {
    CSQueue queue;
    //fullQueueName=root
    String fullQueueName = (parent == null) ? queueName
        : (parent.getQueuePath() + "." + queueName);
    //获取子队列列表,即读取配置项yarn.scheduler.capacity.root.queues
    String[] childQueueNames = conf.getQueues(fullQueueName);
    //队列否开启了资源预留,即读取配置项yarn.scheduler.capacity.root.reservable,默认为false
    boolean isReservableQueue = conf.isReservable(fullQueueName);
    //如果没有子队列则创建LeafQueue
    if (childQueueNames == null || childQueueNames.length == 0) {
        if (null == parent) {
            throw new IllegalStateException(
                "Queue configuration missing child queue names for " + queueName);
        }
        // Check if the queue will be dynamically managed by the Reservation
        // system
         // 如果队列开启了资源预留,则创建PlanQueue,该类型的queue会被ReservationSystem管理
        if (isReservableQueue) {
            queue =
                new PlanQueue(csContext, queueName, parent,
                              oldQueues.get(queueName));
        } else {
            queue = new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));

            // Used only for unit tests
            queue = hook.hook(queue);
        }
    } else {//有子队列则创建ParentQueue
        if (isReservableQueue) {
            throw new IllegalStateException(
                "Only Leaf Queues can be reservable for " + queueName);
        }
        //root队列构建
        ParentQueue parentQueue =
            new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));

        // Used only for unit tests
        queue = hook.hook(parentQueue);
        //root的子队列构建,递归调用parseQueue
        List<CSQueue> childQueues = new ArrayList<CSQueue>();
        for (String childQueueName : childQueueNames) {
            CSQueue childQueue =
                parseQueue(csContext, conf, queue, childQueueName,
                           queues, oldQueues, hook);
            childQueues.add(childQueue);
        }
        //为父队列设置子队列,即childQueues属性赋值,childQueues是个TreeSet按队列的capacity排序
        parentQueue.setChildQueues(childQueues);
    }

    if(queue instanceof LeafQueue == true && queues.containsKey(queueName)
       && queues.get(queueName) instanceof LeafQueue == true) {
        throw new IOException("Two leaf queues were named " + queueName
                              + ". Leaf queue names must be distinct");
    }
    queues.put(queueName, queue);

    LOG.info("Initialized queue: " + queue);
    return queue;
}

ParentQueue构建

public ParentQueue(CapacitySchedulerContext cs, 
                   String queueName, CSQueue parent, CSQueue old) throws IOException {
    super(cs, queueName, parent, old);
    //队列比较器 
    this.queueComparator = cs.getQueueComparator();
    //是否root队列
    this.rootQueue = (parent == null);
    //获取队列设置的capacity,参数:yarn.scheduler.capacity.root.rec.capacity,如果是root队列则直接是100
    float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());

    if (rootQueue &&
        (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
        throw new IllegalArgumentException("Illegal " +
                                           "capacity of " + rawCapacity + " for queue " + queueName +
                                           ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
    }

    float capacity = (float) rawCapacity / 100;
    float parentAbsoluteCapacity = 
        (rootQueue) ? 1.0f : parent.getAbsoluteCapacity();
    float absoluteCapacity = parentAbsoluteCapacity * capacity; 

    float  maximumCapacity =
        (float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
    float absoluteMaxCapacity = 
        CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);

    QueueState state = cs.getConfiguration().getState(getQueuePath());

    Map<AccessType, AccessControlList> acls = 
        cs.getConfiguration().getAcls(getQueuePath());

    //设置conf参数
    setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
                      maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
                      defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels, 
                      cs.getConfiguration().getReservationContinueLook());
    //子队列按队列比较器来进行排序
    this.childQueues = new TreeSet<CSQueue>(queueComparator);

    LOG.debug("Initialized parent-queue " + queueName +
              " name=" + queueName + 
              ", fullname=" + getQueuePath()); 
}

LeafQueue构建

public LeafQueue(CapacitySchedulerContext cs, 
                 String queueName, CSQueue parent, CSQueue old) throws IOException {
    super(cs, queueName, parent, old);
    this.scheduler = cs;

    this.activeUsersManager = new ActiveUsersManager(metrics);
    this.minimumAllocationFactor = 
        Resources.ratio(resourceCalculator, 
                        Resources.subtract(maximumAllocation, minimumAllocation), 
                        maximumAllocation);

    float capacity = getCapacityFromConf();
    float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;

    float maximumCapacity = 
        (float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
    float absoluteMaxCapacity = 
        CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);

    // Initially set to absoluteMax, will be updated to more accurate
    // max avail value during assignContainers
    absoluteMaxAvailCapacity = absoluteMaxCapacity;

    int userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
    float userLimitFactor = 
        cs.getConfiguration().getUserLimitFactor(getQueuePath());

    int maxApplications =
        cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
    if (maxApplications < 0) {
        int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
        maxApplications = (int)(maxSystemApps * absoluteCapacity);
    }
    maxApplicationsPerUser = 
        (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);

    float maxAMResourcePerQueuePercent = cs.getConfiguration()
        .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
    int maxActiveApplications = 
        CSQueueUtils.computeMaxActiveApplications(
        resourceCalculator,
        cs.getClusterResource(), this.minimumAllocation,
        maxAMResourcePerQueuePercent, absoluteMaxCapacity);
    this.maxActiveAppsUsingAbsCap = 
        CSQueueUtils.computeMaxActiveApplications(
        resourceCalculator,
        cs.getClusterResource(), this.minimumAllocation,
        maxAMResourcePerQueuePercent, absoluteCapacity);
    int maxActiveApplicationsPerUser =
        CSQueueUtils.computeMaxActiveApplicationsPerUser(
        maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);

    QueueState state = cs.getConfiguration().getState(getQueuePath());

    Map<AccessType, AccessControlList> acls = 
        cs.getConfiguration().getAcls(getQueuePath());
    //set
    setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
                      maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
                      maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
                      maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
                      .getConfiguration().getNodeLocalityDelay(), accessibleLabels,
                      defaultLabelExpression, this.capacitiyByNodeLabels,
                      this.maxCapacityByNodeLabels,
                      cs.getConfiguration().getReservationContinueLook());

    if(LOG.isDebugEnabled()) {
        LOG.debug("LeafQueue:" + " name=" + queueName
                  + ", fullname=" + getQueuePath());
    }
    //app比较器,按appID排序
    Comparator<FiCaSchedulerApp> applicationComparator = cs.getApplicationComparator();
    //pendingApp列表
    this.pendingApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
    //activeApp列表
    this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
}

ParentQueue和LeafQueue的一点说明:队列通过ParentQueue和LeafQueue构成了一颗树,app只能在LeafQueue上申请资源,LeafQueue保存了当前运行的app列表,app列表分两种:pendingApp列表和activeApp列表,两个列表都是按appID来排序的,即capacityScheduler在队列内部是按FIFO调度的。

(2) 构建队列和用户/组的映射

队列映射 initializeQueueMappings():将用户/组和队列映射起来

private void initializeQueueMappings() throws IOException {
    //如果存在队列映射,它是否会覆盖用户指定的值? 管理员可以使用此项将作业放入与用户指定的队列不同的队列中.
    overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
    LOG.info("Initialized queue mappings, override: "
             + overrideWithQueueMappings);
    // Get new user/group mappings
    //指定用户/组到特定队列的映射 yarn.scheduler.capacity.queue-mappings
    List<QueueMapping> newMappings = conf.getQueueMappings();
    //check if mappings refer to valid queues
    for (QueueMapping mapping : newMappings) {
        if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
            !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
            CSQueue queue = queues.get(mapping.queue);
            if (queue == null || !(queue instanceof LeafQueue)) {
                throw new IOException(
                    "mapping contains invalid or non-leaf queue " + mapping.queue);
            }
        }
    }
    //apply the new mappings since they are valid
    mappings = newMappings;
    // initialize groups if mappings are present
    if (mappings.size() > 0) {
        groups = new Groups(conf);
    }
}

capacity相关重要类

CSQueue 接口

CSQueue代表CapacityScheduler中分层队列树中的一个节点,使用这个类来保存队列的信息

ClusterNodeTracker

帮助程序库:-跟踪所有群集SchedulerNodes的状态-提供方便的方法来过滤和排序节点

ResourceCalculator

进行资源比较和操作

ParentQueue

PlanQueue

这表示由ReservationSystem管理的动态队列。从用户的角度来看,这等效于保留的LeafQueue,但从功能上讲,它是ParentQueue的子类。

4 AMLivelinessMonitor

在ResouceManager#RMActiveServices#serviceInit()方法中进行createAMLivelinessMonitor的创建

AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor);

AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);

创建了两个AMLivelinessMonitor实例:amLivelinessMonitor代表运行中的AM,amFinishingMonitor代表完成中的AM。

进入createAMLivelinessMonitor()

protected AMLivelinessMonitor createAMLivelinessMonitor() {
    return new AMLivelinessMonitor(this.rmDispatcher);
}




//进入new AMLivelinessMonitor

//进入new AMLivelinessMonitor
public AMLivelinessMonitor(Dispatcher d) {
    super("AMLivelinessMonitor", new SystemClock());
    this.dispatcher = d.getEventHandler();
}

public void serviceInit(Configuration conf) throws Exception {
    super.serviceInit(conf);
    //am过期时间,yarn.am.liveness-monitor.expiry-interval-ms,默认10分钟
    int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
                                  YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
    //设置am过期间隔为10分钟
    setExpireInterval(expireIntvl);
    //设置am监控间隔为3分钟
    setMonitorInterval(expireIntvl/3);
}


AbstractLivelinessMonitor.java

@Override
protected void serviceStart() throws Exception {
    assert !stopped : "starting when already stopped";
    //将存储的AM的时间重置为当前时间
    resetTimer();
    //创建一个监控AM的线程
    checkerThread = new Thread(new PingChecker());
    checkerThread.setName("Ping Checker");
    //启动监控线程,不停的监控AM是否过期了
    checkerThread.start();
    super.serviceStart();
}

来看看监控线程的工作

private class PingChecker implements Runnable {
    @Override
    public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
            synchronized (AbstractLivelinessMonitor.this) {
                //running是个map,元素为<AM,reportTime>
                Iterator<Map.Entry<O, Long>> iterator = running.entrySet().iterator();

                //avoid calculating current time everytime in loop
                long currentTime = clock.getTime();
                //所有AM进行当前时间和上次心跳汇报时间的间隔比较,如果超过设置的过期时间没有汇报心跳则认为AM过期了,然后会发起过期流程
                while (iterator.hasNext()) {
                    Map.Entry<O, Long> entry = iterator.next();
                    if (currentTime > entry.getValue() + expireInterval) {
                        iterator.remove();
                        //发起过期流程
                        expire(entry.getKey());
                        LOG.info("Expired:" + entry.getKey().toString() + 
                                 " Timed out after " + expireInterval/1000 + " secs");
                    }
                }
            }
            try {
                Thread.sleep(monitorInterval);
            } catch (InterruptedException e) {
                LOG.info(getName() + " thread interrupted");
                break;
            }
        }
    }
}

来看下过期流程:AMLivelinessMonitor.expire()

@Override
protected void expire(ApplicationAttemptId id) {
    dispatcher.handle(new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));
}

即向RMAppAttemptImpl发送RMAppAttemptEventType.EXPIRE事件。

总结

capacity 使用Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>()来记录队列,

使用CSQueue保存队列的具体信息

解析队列 的时候,使用递归来解析child队列,同时设置每个队列父亲队列,队列通过ParentQueue和LeafQueue构成了一颗树,app只能在LeafQueue上申请资源,LeafQueue保存了当前运行的app列表,app列表分两种:pendingApp列表和activeApp列表,两个列表都是按appID来排序的,即capacityScheduler在队列内部是按FIFO调度的。

问题1:queue = hook.hook(parentQueue);

问题2:addService()

参考:https://blog.csdn.net/weixin_44630798/article/details/89005651

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,013评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,205评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,370评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,168评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,153评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,954评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,271评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,916评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,382评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,877评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,989评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,624评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,209评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,199评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,418评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,401评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,700评论 2 345

推荐阅读更多精彩内容

  • 睿教通APP用户服务协议 睿教通APP尊重并保护所有使用服务用户的个人隐私权。为了给您提供更准确、更有个性化的服务...
    larslee阅读 291评论 0 0
  • 牢记职责不负使命丨以最快速度进入状态 2020年是不平凡的一年,在春节前夕爆发的【新冠肺炎】病毒肆虐,国家宣布进入...
    森林走心篇章阅读 262评论 0 0
  • 考研成绩公布前后考生需要关注的问题 距离2020考研成绩公布还有一段时间,此时举国上下也正在齐心协力的应对新型冠状...
    woainicxj阅读 208评论 0 0
  • 最近胸口闷。 考试题目太难。 大姨妈来了腰疼。 上课老师不让睡觉, 晚上还要做ppt展示。
    每天充电的花猫阅读 229评论 0 0
  • 我们本无交集,虽然同班两年也都不熟,我只知你是女班长和谁谈论都要提起的人。 班里的座位一直是女班长安排的,而你总不...
    不被驯养的猫阅读 626评论 5 12