RocketMq DefaultLitePullConsumer流程分析


开篇

  • 这篇文章的核心逻辑分析DefaultLitePullConsumer的消息拉取的流程,核心思路在于通过ReBalanceImpl来驱动整个消息拉取任务启动。
  • DefaultLitePullConsumer和DefaultMQPullConsumer的差别在于前者是consumer负责拉取消息,后者是业务代码负责拉取消息。


DefaultLitePullConsumer拉取流程

DefaultLitePullConsumer消息拉取
  • DefaultLitePullConsumer的整个拉取流程如上所示,核心在于start流程。
  • 核心的逻辑通过RebalanceService来驱动消息的拉取过程,负载均衡的逻辑和消息推送方式一致。


消息拉取例子

public class LitePullConsumerSubscribe {

    public static volatile boolean running = true;

    public static void main(String[] args) throws Exception {
        // 1、创建DefaultLitePullConsumer对象
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
        // 2、设置namesrv地址
        litePullConsumer.setNamesrvAddr("localhost:9876");
        // 3、订阅消费主题
        litePullConsumer.subscribe("TopicTest", "*");
        // 4、启动消费对象
        litePullConsumer.start();
        try {
            // 5、循环开始消费消息
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}
  • 1、创建DefaultLitePullConsumer对象。
  • 2、设置namesrv地址。
  • 3、订阅消费主题。
  • 4、启动消费对象。
  • 5、循环开始消费消息。


创建消费者

DefaultLitePullConsumer

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
    // 消息拉取的实现DefaultLitePullConsumerImpl
    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
    private String consumerGroup;
    private long brokerSuspendMaxTimeMillis = 1000 * 20;
    private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
    private long consumerPullTimeoutMillis = 1000 * 10;
    private MessageModel messageModel = MessageModel.CLUSTERING;
    // 消费队列回调监听函数MessageQueueListener
    private MessageQueueListener messageQueueListener;
    private OffsetStore offsetStore;
    // MessageQueue的分配选择器
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
    private boolean unitMode = false;
    private boolean autoCommit = true;
    private int pullThreadNums = 20;
    private long autoCommitIntervalMillis = 5 * 1000;
    private int pullBatchSize = 10;
    private long pullThresholdForAll = 10000;
    private int consumeMaxSpan = 2000;
    private int pullThresholdForQueue = 1000;
    private int pullThresholdSizeForQueue = 100;
    private long pollTimeoutMillis = 1000 * 5;
    private long topicMetadataCheckIntervalMillis = 30 * 1000;

    public DefaultLitePullConsumer(final String consumerGroup) {
        this(null, consumerGroup, null);
    }

    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.consumerGroup = consumerGroup;
        // 创建DefaultLitePullConsumerImpl对象
        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
    }
}
  • 1、创建DefaultLitePullConsumer对象。
  • 2、DefaultLitePullConsumer内包含DefaultLitePullConsumerImpl对象。


DefaultLitePullConsumerImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    private final InternalLogger log = ClientLogger.getLog();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final RPCHook rpcHook;
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    // 核心的消息消费的MQClientInstance对象
    protected MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private OffsetStore offsetStore;
    // 消费队列的负载均衡
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    private enum SubscriptionType {
        NONE, SUBSCRIBE, ASSIGN
    }

    private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
    private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
    private SubscriptionType subscriptionType = SubscriptionType.NONE;
    private long pullTimeDelayMillsWhenException = 1000;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
    private DefaultLitePullConsumer defaultLitePullConsumer;
    // 消费任务的数据结构
    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
    // 分配的消息队列
    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
    // 消费消息的数据结构
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private final ScheduledExecutorService scheduledExecutorService;
    private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
    private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
    private long consumeRequestFlowControlTimes = 0L;
    private long queueFlowControlTimes = 0L;
    private long queueMaxSpanFlowControlTimes = 0L;
    private long nextAutoCommitDeadline = -1L;
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();

    public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
        this.defaultLitePullConsumer = defaultLitePullConsumer;
        this.rpcHook = rpcHook;

        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
            this.defaultLitePullConsumer.getPullThreadNums(),
            new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
        );

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MonitorMessageQueueChangeThread");
            }
        });
        this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
    }
}
  • DefaultLitePullConsumerImpl是消息拉取消费的实现类。
  • RebalanceImpl是消息拉取的消费队列的负载均衡器。
  • taskTable负责保存MessageQueue和对应的消息拉取任务。
  • consumeRequestCache保存的消息消费任务。


订阅消费主题

DefaultLitePullConsumerImpl

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {

    @Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
    }
}


public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    // 消费队列的负载均衡
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
    // assignedMessageQueue保存负责的MessageQueue对象
    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();

    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            if (topic == null || topic.equals("")) {
                throw new IllegalArgumentException("Topic can not be null or empty.");
            }
            // 1、创建订阅的数据结构对象
            setSubscriptionType(SubscriptionType.SUBSCRIBE);
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                topic, subExpression);
            // 2、负责均衡保存订阅数据对象
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            // 3、注册消息队列回调处理函数
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
            // 4、assignedMessageQueue保存rebalanceImpl
            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);

            if (serviceState == ServiceState.RUNNING) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                updateTopicSubscribeInfoWhenSubscriptionChanged();
            }
        } catch (Exception e) {
            throw new MQClientException("subscribe exception", e);
        }
    }
}
  • 1、创建订阅的数据结构对象。
  • 2、MessageQueue消费负载均衡器负责保存订阅数据对象。
  • 3、注册MessageQueue重新分配的回调处理器。
  • 4、assignedMessageQueue保存rebalanceImpl。

启动消息拉取消费者

DefaultLitePullConsumerImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    protected MQClientInstance mQClientFactory;

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }
                // 初始化Client端负责消息拉取
                initMQClientFactory();
                // 初始化MessageQueue的负载均衡器
                initRebalanceImpl();
                initPullAPIWrapper();
                // 初始化消费位移保存
                initOffsetStore();
                // 核心启动MQClientInstance
                mQClientFactory.start();
                startScheduleTask();
                this.serviceState = ServiceState.RUNNING;
                operateAfterRunning();

                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }

    private void initMQClientFactory() throws MQClientException {
        // 创建MQClientInstance对象
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);

        boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
    }
}


public class MQClientInstance {

    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();

    public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
        if (null == group || null == consumer) {
            return false;
        }

        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
        if (prev != null) {
            log.warn("the consumer group[" + group + "] exist already.");
            return false;
        }

        return true;
    }
}
  • initMQClientFactory负责创建MQClientInstance对象。
  • MQClientInstance对象负责消息拉取的核心逻辑。
  • consumerTable保存消费分组和consumer实例的映射关系。


MQClientInstance

public class MQClientInstance {

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        }

        this.clientId = clientId;
        this.mQAdminImpl = new MQAdminImpl(this);
        this.pullMessageService = new PullMessageService(this);
        this.rebalanceService = new RebalanceService(this);
        this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        this.defaultMQProducer.resetClientConfig(clientConfig);
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
    }

    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
}
  • rebalanceService是消息拉取流程的整个驱动器。
  • pullMessageService在消息拉取流程中没有起到作用。


RebalanceService

public class RebalanceService extends ServiceThread {

    private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}


public class MQClientInstance {

    public void doRebalance() {
        // consumerTable保存key为LitePullConsumer的DefaultLitePullConsumerImpl对象
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable e) {
                }
            }
        }
    }
}
  • RebalanceService的核心流程在于触发DefaultLitePullConsumerImpl的重新负载均衡。
  • 核心需要关注DefaultLitePullConsumerImpl的doRebalance方法。


RebalanceImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    // RebalanceLitePullImpl extends RebalanceImpl 
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    public void doRebalance() {
        if (this.rebalanceImpl != null) {
            this.rebalanceImpl.doRebalance(false);
        }
    }
}


public abstract class RebalanceImpl {

    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
        new ConcurrentHashMap<String, Set<MessageQueue>>();
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
        new ConcurrentHashMap<String, SubscriptionData>();

    public void doRebalance(final boolean isOrder) {
        // 获取consumer
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }


    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return subscriptionInner;
    }


    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {

                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }
}


public class RebalanceLitePullImpl extends RebalanceImpl {

    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            } catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
    }
}
  • DefaultLitePullConsumerImpl的doRebalance会执行RebalanceLitePullImpl的doRebalance方法。
  • 核心会遍历所有订阅的Topic依次执行rebalanceByTopic的操作。
  • rebalanceByTopic会执行MessageQueue的分配操作,按照MessageQueue和消费分组进行分配,分配策略和推送的消费分组是一致的。
  • 负载消费拉取的MessageQueue有变动的情况下会执行RebalanceLitePullImpl 的messageQueueChanged操作。
  • messageQueueListener对象为DefaultLitePullConsumerImpl的MessageQueueListenerImpl。


MessageQueueListenerImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();

    class MessageQueueListenerImpl implements MessageQueueListener {
        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
            switch (messageModel) {
                case BROADCASTING: // 广播模式
                    updateAssignedMessageQueue(topic, mqAll);
                    updatePullTask(topic, mqAll);
                    break;
                case CLUSTERING: // 集群模式
                    updateAssignedMessageQueue(topic, mqDivided);
                    updatePullTask(topic, mqDivided);
                    break;
                default:
                    break;
            }
        }
    }

    private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
        // assignedMessageQueue负责保存topic和对应分配的MessageQueue
        this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
    }

    private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            // 移除不负责的消息拉取任务PullTask
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
            if (next.getKey().getTopic().equals(topic)) {
                if (!mqNewSet.contains(next.getKey())) {
                    next.getValue().setCancelled(true);
                    it.remove();
                }
            }
        }
        // 启动MessageQueue对应的拉取任务
        startPullTask(mqNewSet);
    }

    private void startPullTask(Collection<MessageQueue> mqSet) {
        for (MessageQueue messageQueue : mqSet) {
            if (!this.taskTable.containsKey(messageQueue)) {
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                this.taskTable.put(messageQueue, pullTask);
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
            }
        }
    }
}
  • updateAssignedMessageQueue负责赋值最新分配的MessageQueue。
  • startPullTask负责启动消息拉取任务。


DefaultLitePullConsumerImpl

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

    public class PullTaskImpl implements Runnable {
        private final MessageQueue messageQueue;
        private volatile boolean cancelled = false;

        public PullTaskImpl(final MessageQueue messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {

            if (!this.isCancelled()) {
                // 从assignedMessageQueue获取ProcessQueue对象
                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);

                long offset = nextPullOffset(messageQueue);
                long pullDelayTimeMills = 0;
                try {
                    SubscriptionData subscriptionData;
                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                        String topic = this.messageQueue.getTopic();
                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                    } else {
                        String topic = this.messageQueue.getTopic();
                        subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                            topic, SubscriptionData.SUB_ALL);
                    }
                    // 执行消息的拉取动作
                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                            synchronized (objLock) {
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                                    // 拉取的消息放置到processQueue当中
                                    processQueue.putMessage(pullResult.getMsgFoundList());
                                    // submitConsumeRequest负责保存待消费的任务
                                    submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
                                }
                            }
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("The pull request offset illegal, {}", pullResult.toString());
                            break;
                        default:
                            break;
                    }
                    // 更新下一次拉取的位移
                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                } catch (Throwable e) {
                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
                }

                // 重新投递消息拉取任务
                if (!this.isCancelled()) {
                    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
                } else {
                }
            }
        }
    }

    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
        try {
            consumeRequestCache.put(consumeRequest);
        } catch (InterruptedException e) {
            log.error("Submit consumeRequest error", e);
        }
    }


    public class ConsumeRequest {
        private final List<MessageExt> messageExts;
        private final MessageQueue messageQueue;
        private final ProcessQueue processQueue;

        public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
            final ProcessQueue processQueue) {
            this.messageExts = messageExts;
            this.messageQueue = messageQueue;
            this.processQueue = processQueue;
        }
    }
}
  • PullTaskImpl是负责消息拉取的任务。
  • PullTaskImpl#run过程中会执行消息的拉取的pull操作,更新下次拉取的位移,通过scheduledThreadPoolExecutor.schedule()再次投递消息拉取任务。
  • consumeRequestCache负责保存拉取待消费的任务ConsumeRequest任务。


消息的消费

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    public synchronized List<MessageExt> poll(long timeout) {
        try {
            checkServiceState();
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            if (defaultLitePullConsumer.isAutoCommit()) {
                maybeAutoCommit();
            }
            long endTime = System.currentTimeMillis() + timeout;

            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

            if (endTime - System.currentTimeMillis() > 0) {
                while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                    consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (endTime - System.currentTimeMillis() <= 0)
                        break;
                }
            }

            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                // 设置messages的topic为重试队列
                this.resetTopic(messages);
                return messages;
            }
        } catch (InterruptedException ignore) {

        }

        return Collections.emptyList();
    }
}
  • poll负责拉取待消费的任务进行处理。
  • resetTopic负责重新设置消息的Topic为重试队列,但是似乎没有什么用。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 197,966评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,170评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 144,909评论 0 327
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,959评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,851评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,583评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,956评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,590评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,878评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,892评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,719评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,501评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,957评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,124评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,440评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,003评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,211评论 2 339