rocketmq源码7-客户端-push消费者

二 DefaultMQPushConsumer`

  • 消费线程数最大最小值配置consumeThreadMin,consumeThreadMax
  • 快照缓存消息数量pullThresholdForQueue = 1000;
  • 快照缓存消息大小MBpullThresholdSizeForQueue = 100;
  • topic级别缓存流控配置
int pullThresholdForTopic = -1;
int pullThresholdSizeForTopic = -1;
  • 一次拉取的消息数量
    int pullBatchSize = 32;

2.1 DefaultMQPushConsumerImpl

  • 配置信息
  • 钩子接口
  • 拉消息代理接口PullAPIWrapper pullAPIWrapper;
  • 消息消费服务 ConsumeMessageService consumeMessageService;

2.1.1 RebalancePushImpl

  • push使用的负载均衡处理服务
    public class RebalancePushImpl extends RebalanceImpl
  • 计算消费起始偏移量
@Override
    public long computePullFromWhere(MessageQueue mq) {
        long result = -1;
        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
            case CONSUME_FROM_MIN_OFFSET:
            case CONSUME_FROM_MAX_OFFSET:
            case CONSUME_FROM_LAST_OFFSET: {
//持久化偏移数据中获取末尾偏移量
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                }
                // First start,no offset
                else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        result = 0L;
                    } else {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    result = 0L;
                } else {
                    result = -1;
                }
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    } else {
                        try {
                            long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                UtilAll.YYYYMMDDHHMMSS).getTime();
                            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }

            default:
                break;
        }

        return result;
    }
  • 重新开始拉取消息,更新缓存快照中消息,负载均衡重新分配处理完后调用
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    for (PullRequest pullRequest : pullRequestList) {
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}

2.1.2 RebalanceImpl

  • mq的消费快照信息
    ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable
  • 消费的topic在所有broker上的MessageQueue信息
    ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable
  • topic消费的tag信息
    ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner
  • 消费组String consumerGroup;
  • 消费模式,集群模式或广播模式MessageModel messageModel;
  • 负载均衡策略AllocateMessageQueueStrategy, 为平均分配策略

2.1.2.1 ProcessQueue

  • msgTreeMap并发访问读写锁
    ReadWriteLock lockTreeMap
  • 消息存储结构,key为消息偏移量
    TreeMap<Long, MessageExt> msgTreeMap
  • 消息数量AtomicLong msgCount = new AtomicLong();
  • 消息总长度AtomicLong msgSize = new AtomicLong();
  • 顺序消费的临时存储map
    TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap
  • 当前消息快照中缓存的消息的最大偏移量
    volatile long queueOffsetMax = 0L;
  • 放弃的快照标记volatile boolean dropped = false;
  • 上次pull消息时间,isPullExpired判断pull超时时间120s,push方式消费超时则删除快照。
volatile long lastPullTimestamp = System.currentTimeMillis();

public boolean isPullExpired() {
    return (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
}
  • 上次消费快照中消息时间
    volatile long lastConsumeTimestamp = System.currentTimeMillis();
  • 锁broker上mq标记和时间
private volatile boolean locked = false;
private volatile long lastLockTimestamp = System.currentTimeMillis();

Set<MessageQueue> lockedMq =
    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
    if (processQueue != null) {
        processQueue.setLocked(true);
        processQueue.setLastLockTimestamp(System.currentTimeMillis());
    }
}
  • 消费中标记volatile boolean consuming = false;
  • 消息队列中堆积的消息数
volatile long msgAccCnt = 0;

if (!msgs.isEmpty()) {
    MessageExt messageExt = msgs.get(msgs.size() - 1);
    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
    if (property != null) {
        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
        if (accTotal > 0) {
            this.msgAccCnt = accTotal;
        }
    }
}
2.1.2.1.1 cleanExpiredMsg
  • 删除消费超时的缓存消息,push消费客户端使用
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
//顺序消费不清理超时消息
    if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
        return;
    }
//遍历本地缓存消息的treemap
    int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
    for (int i = 0; i < loop; i++) {
        MessageExt msg = null;
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
//获取消费超时的消息
                if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                    msg = msgTreeMap.firstEntry().getValue();
                } else {

                    break;
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getExpiredMsg exception", e);
        }

        try {
//发送给broker的retry重试队列,
            pushConsumer.sendMessageBack(msg, 3);
            log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
//优先删除偏移量在前的缓存消息
                    if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                        try {
                            removeMessage(Collections.singletonList(msg));
                        } catch (Exception e) {
                            log.error("send expired msg exception", e);
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception", e);
            }
        } catch (Exception e) {
            log.error("send expired msg exception", e);
        }
    }
}
2.1.2.1.2 putMessage
  • 缓存消息
public boolean putMessage(final List<MessageExt> msgs) {
    boolean dispatchToConsume = false;
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            int validMsgCnt = 0;
//缓存新的消息,以消息偏移量为key
            for (MessageExt msg : msgs) {
                MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                if (null == old) {
                    validMsgCnt++;
                    this.queueOffsetMax = msg.getQueueOffset();
                    msgSize.addAndGet(msg.getBody().length);
                }
            }
            msgCount.addAndGet(validMsgCnt);

            if (!msgTreeMap.isEmpty() && !this.consuming) {
                dispatchToConsume = true;
                this.consuming = true;
            }
//统计消息堆积数量
            if (!msgs.isEmpty()) {
                MessageExt messageExt = msgs.get(msgs.size() - 1);
                String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                if (property != null) {
                    long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                    if (accTotal > 0) {
                        this.msgAccCnt = accTotal;
                    }
                }
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("putMessage exception", e);
    }

    return dispatchToConsume;
}
2.1.2.1.3 removeMessage
  • 普通缓存消息消费后处理
public long removeMessage(final List<MessageExt> msgs) {
    long result = -1;
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!msgTreeMap.isEmpty()) {
                result = this.queueOffsetMax + 1;
                int removedCnt = 0;
                for (MessageExt msg : msgs) {
//删除缓存的消息,变更统计数据
                    MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                    if (prev != null) {
                        removedCnt--;
                        msgSize.addAndGet(0 - msg.getBody().length);
                    }
                }
                msgCount.addAndGet(removedCnt);
//返回下一个待消费消息
                if (!msgTreeMap.isEmpty()) {
                    result = msgTreeMap.firstKey();
                }
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (Throwable t) {
        log.error("removeMessage exception", t);
    }

    return result;
}
2.1.2.1.4 顺序消息消费
  • 获取一次性顺序消息消费数量的消息
public List<MessageExt> takeMessags(final int batchSize) {
    List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!this.msgTreeMap.isEmpty()) {
                for (int i = 0; i < batchSize; i++) {
//遍历msgTreeMap消息缓存,放入consumingMsgOrderlyTreeMap顺序消费消息缓存中
                    Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                    if (entry != null) {
                        result.add(entry.getValue());
                        consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
                    } else {
                        break;
                    }
                }
            }

            if (result.isEmpty()) {
                consuming = false;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("take Messages exception", e);
    }

    return result;
}
  • 消费失败回滚,再放回消息缓存中msgTreeMap
public void rollback() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
            this.consumingMsgOrderlyTreeMap.clear();
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("rollback exception", e);
    }
}
  • 消费成功,则清空顺序消费消息缓存consumingMsgOrderlyTreeMap
public long commit() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
                msgSize.addAndGet(0 - msg.getBody().length);
            }
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}

2.1.2.2 doRebalance

  • 重新负载均衡分配
public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
//遍历消费topic
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
//重新负载均衡分配消息队列
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
//删除不再消费的mq队列消息快照
    this.truncateMessageQueueNotMyTopic();
}
  • 按消费模式进行不同的负载均衡策略分配
private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
//广播模式消费所有队列
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
//变更监听通知
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                        consumerGroup,
                        topic,
                        mqSet,
                        mqSet);
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        case CLUSTERING: {
//集群模式,按平均分配策略,给每个消费族实例分配消息队列
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
            }

            if (null == cidAll) {
                log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
            }

            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                Collections.sort(mqAll);
                Collections.sort(cidAll);

                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                        e);
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }
//更新消息快照缓存,变更监听通知
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info(
                        "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                        strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                        allocateResultSet.size(), allocateResultSet);
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}

2.1.3 ConsumeMessageService

2.1.3.1 ConsumeMessageOrderlyService

  • 顺序消费
  • 集群模式,则周期任务锁broker上的mq
public void start() {
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                ConsumeMessageOrderlyService.this.lockMQPeriodically();
            }
        }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}
  • 业务消费回调接口
MessageListenerOrderly messageListener;

public interface MessageListenerOrderly extends MessageListener {
    /**
     * It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
     * if consumption failure
     *
     * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
     * @return The consume status
     */
    ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
        final ConsumeOrderlyContext context);
}
  • 直接调用消费接口消费消息consumeMessageDirectly
  • 异步线程池线程调用消费接口消费消息
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}
  • ConsumeRequest异步线程任务消费
    广播模式直接调用消费回调
    集群模式需要先加锁,并判断锁是否过期
    List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);获取待消费的消息
    调用消费回调接口
    根据消费结果,成功则释放消息缓存或失败则回滚消息缓存继续消费
    持久化消费偏移量

2.1.3.2 ConsumeMessageConcurrentlyService

  • 并行消费
  • 初始化,定时线程任务,清理消息缓存快照中的过期消息。
public void start() {
    this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            cleanExpireMsg();
        }

    }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
  • 重试topic的消息,修改topic为原topic
public void resetRetryTopic(final List<MessageExt> msgs) {
    final String groupTopic = MixAll.getRetryTopic(consumerGroup);
    for (MessageExt msg : msgs) {
        String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
            msg.setTopic(retryTopic);
        }
    }
}
  • 直接当前线程消费consumeMessageDirectly()
    调用消费回调函数this.messageListener.consumeMessage(msgs, context);
  • 异步线程池调度消费
2.1.3.2.1 异步任务发送
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
//一次任务处理的最大消息数
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
//分拆组装多个异步任务请求
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
//提交给异步调度线程池执行
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}
2.1.3.2.2 异步任务执行ConsumeRequest
public void run() {
        if (this.processQueue.isDropped()) {
            log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
            return;
        }
//消费回调
        MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
        ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
        ConsumeConcurrentlyStatus status = null;

        ConsumeMessageContext consumeMessageContext = null;
        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
//钩子函数处理
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
            consumeMessageContext.setProps(new HashMap<String, String>());
            consumeMessageContext.setMq(messageQueue);
            consumeMessageContext.setMsgList(msgs);
            consumeMessageContext.setSuccess(false);
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
        }

        long beginTimestamp = System.currentTimeMillis();
        boolean hasException = false;
        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
        try {
//重试topic的消息,修改topic为原topic            ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
            if (msgs != null && !msgs.isEmpty()) {
                for (MessageExt msg : msgs) {
                    MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                }
            }
//消费回调消费消息
            status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } catch (Throwable e) {
            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                RemotingHelper.exceptionSimpleDesc(e),
                ConsumeMessageConcurrentlyService.this.consumerGroup,
                msgs,
                messageQueue);
            hasException = true;
        }
        long consumeRT = System.currentTimeMillis() - beginTimestamp;
        if (null == status) {
            if (hasException) {
                returnType = ConsumeReturnType.EXCEPTION;
            } else {
                returnType = ConsumeReturnType.RETURNNULL;
            }
        } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
            returnType = ConsumeReturnType.TIME_OUT;
        } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
            returnType = ConsumeReturnType.FAILED;
        } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
            returnType = ConsumeReturnType.SUCCESS;
        }

        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
        }

        if (null == status) {
            log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                ConsumeMessageConcurrentlyService.this.consumerGroup,
                msgs,
                messageQueue);
            status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext.setStatus(status.toString());
            consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
        }        ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
            .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

        if (!processQueue.isDropped()) {
//处理消费结果            ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
        } else {
            log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
        }
    }
}
2.1.3.2.3 消费结果处理
public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
//消费成功的消息索引
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;
//消费成功和失败统计
    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
//广播模式丢弃消费失败消息
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
//集群模式,消费失败的先返回给broker重试队列,
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }
//重试发送失败,则新建异步任务,重新消费之前消费失败的消息
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  //持久化消费偏移量      this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

2.1.4 初始化start

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;
//配置校验
            this.checkConfig();
//消费信息拷贝到rebalanceImpl中,
//集群消费模式添加消费族的retry重试topic,配置消费tag为*
            this.copySubscription();
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
   //使用进程id             this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
//通信客户端
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//rebalanceImpl的消费组,消费模式,负载均衡策略等设置           this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());          this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());         this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//拉消息的代理接口初始化
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());          this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//消费偏移持久化服务初始化
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
//广播模式消费,则本地文件持久化
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
//集群模式消费,则broker持久化
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }             this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
//加载持久化的消费便宜数据
            this.offsetStore.load();

            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
//push消费回调是顺序消费类型
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
//push消费回调是并发消费类型
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }
//启动消息消费服务
            this.consumeMessageService.start();

            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
//启动通信客户端
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
//从namesrv更新topic的路由信息
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//发消息给broker进行校验
    this.mQClientFactory.checkClientInBroker();
//向所有broker发送心跳报文
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//执行负载均衡策略,分配消息队列
    this.mQClientFactory.rebalanceImmediately();
}

2.1.5 pullMessage拉取消息

  • 从pullRequest队列中获取pullRequest,执行消息拉取。
public void pullMessage(final PullRequest pullRequest) {
//已废弃的消息缓存快照,则不拉新的消息
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    if (processQueue.isDropped()) {
        log.info("the pull request[{}] is dropped.", pullRequest.toString());
        return;
    }
//更新缓存快照的上次拉消息时间,pull处理超时判断使用
 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
//客户端状态检查,需要初始化完成,运行中
   try {
        this.makeSureStateOK();
    } catch (MQClientException e) {
        log.warn("pullMessage exception, consumer state not ok", e);
//状态错误,则把拉消息请求重新放入pullRequest队列中,延迟处理
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        return;
    }
//暂停状态,把pullRequest放回队列中,延迟处理
    if (this.isPause()) {
        log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
        return;
    }
//判断缓存消息数量和缓存消息大小是否超过阈值,超过则放回pullRequest队列,延迟处理
    long cachedMessageCount = processQueue.getMsgCount().get();
    long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",                this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
        }
        return;
    }
    if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",                this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
        }
        return;
    }
//并行消费类型,则一次性最大缓存的消息偏移量范围阈值检测,超过了则放回pullRequest队列,延迟处理
    if (!this.consumeOrderly) {
        if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                    processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                    pullRequest, queueMaxSpanFlowControlTimes);
            }
            return;
        }
    } else {//顺序消息消费
        if (processQueue.isLocked()) {
//broker加锁成功
            if (!pullRequest.isLockedFirst()) {
//首次消费,则从broker获取消费偏移量更新缓存快照中的消费起始偏移量
                final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                boolean brokerBusy = offset < pullRequest.getNextOffset();
                log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                    pullRequest, offset, brokerBusy);
                if (brokerBusy) {
                    log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                        pullRequest, offset);
                }

                pullRequest.setLockedFirst(true);
                pullRequest.setNextOffset(offset);
            }
        } else {//未加锁成功,则放入pullRequest队列中延迟处理
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.info("pull message later because not locked in broker, {}", pullRequest);
            return;
        }
    }
//topic获取对应的消费配置信息,不存在则再次放入pullRequest队列中延迟处理
    final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    if (null == subscriptionData) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        log.warn("find the consumer's subscription failed, {}", pullRequest);
        return;
    }
    final long beginTimestamp = System.currentTimeMillis();
//拉取消息后的回调函数
    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
//消息解码,tag过滤,钩子函数处理
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);
                switch (pullResult.getPullStatus()) {
                    case FOUND:
//更新下次拉消息的偏移量
                        long prevRequestOffset = pullRequest.getNextOffset();                       pullRequest.setNextOffset(pullResult.getNextBeginOffset());
//计算一次拉消息的耗时,并统计纪录
                        long pullRT = System.currentTimeMillis() - beginTimestamp;                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullRT);

                        long firstMsgOffset = Long.MAX_VALUE;
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
//无新消息,则放回pullRequest队列中,                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        } else {
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
//消息统计                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
//放入消息缓存快照中processQueue
                            boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//通知消费服务consumeMessageService调用业务消费回调函数消费消息                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispathToConsume);
//配置拉消息间隔,则使用线程延迟放入pullRequest队列中处理,否则立即放入pullRequest队列中处理
                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {                              DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            } else {                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            }
                        }
//拉消息的偏移量有异常,拉取了重复的消息
                        if (pullResult.getNextBeginOffset() < prevRequestOffset
                            || firstMsgOffset < prevRequestOffset) {
                            log.warn(
                                "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                pullResult.getNextBeginOffset(),
                                firstMsgOffset,
                                prevRequestOffset);
                        }

                        break;
                    case NO_NEW_MSG:
//无新消息,更新消费偏移,延迟放入pullRequest队列处理                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                    case NO_MATCHED_MSG:
//无匹配的消息,更新消费偏移,延迟放入pullRequest队列处理                                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                    case OFFSET_ILLEGAL:
//偏移量错误,更新消费偏移,删除消费快照                        
                        log.warn("the pull request offset illegal, {} {}",
                            pullRequest.toString(), pullResult.toString());                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());                        pullRequest.getProcessQueue().setDropped(true);                        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {//异步任务持久化消费偏移量,删除消费快照
                            @Override
                            public void run() {
                                try {                                   DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                        pullRequest.getNextOffset(), false);                                    DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());                                    DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                    log.warn("fix the pull request offset, {}", pullRequest);
                                } catch (Throwable e) {
                                    log.error("executeTaskLater Exception", e);
                                }
                            }
                        }, 10000);
                        break;
                    default:
                        break;
                }
            }
        }

        @Override
        public void onException(Throwable e) {
            if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                log.warn("execute the pull request exception", e);
            }
//拉消息异常,则异步线程延迟放入pullRequest队列处理            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    };
//读取消费偏移值,有便宜值则拉消息时同时持久化便宜量
    boolean commitOffsetEnable = false;
    long commitOffsetValue = 0L;
    if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
        commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
        if (commitOffsetValue > 0) {
            commitOffsetEnable = true;
        }
    }
//订阅类型提交
    String subExpression = null;
    boolean classFilter = false;
    SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    if (sd != null) {
        if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
            subExpression = sd.getSubString();
        }
        classFilter = sd.isClassFilterMode();
    }
//创建标记位
    int sysFlag = PullSysFlag.buildSysFlag(
        commitOffsetEnable, // commitOffset
        true, // suspend
        subExpression != null, // subscription
        classFilter // class filter
    );
    try {
//异步拉消息接口,拉消息成功后调用pullCallback回调函数
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(),
            subExpression,
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),
            pullRequest.getNextOffset(),
            this.defaultMQPushConsumer.getPullBatchSize(),
            sysFlag,
            commitOffsetValue,
            BROKER_SUSPEND_MAX_TIME_MILLIS,
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
            CommunicationMode.ASYNC,
            pullCallback
        );
    } catch (Exception e) {
        log.error("pullKernelImpl exception", e);
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容