RocketMq技术内幕笔记(四)

5 RocketMQ 消息消费

5.1 RocketMQ 消息消费概述

消息消费以组的模式开展, 一个消费组内可以包含多个消费者,每一个消费组可订阅 多个主题,消费组之间有集群模式与广播模式两种消费模式 。

集群模式,主题下的同一条 消息只允许被其中一个消费者消费 。
广播模式,主题下的同一条消息将被集群内的所有消 费者消费一次。

消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式。 所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后,推送给消息消费者。 RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用的思想 : 一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列

RocketMQ 支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。 不 支持消息全局顺序消费, 如果要实现某一主题的全局顺序消息消费, 可以将该主题的队列数设置为1,牺牲高可用性。

RocketMQ 支持两种消息过滤模式:表达式(TAG、 SQL92)与类过滤模式。

消息拉模式,主要是由客户端手动调用消息拉取API,而消息推模式是消息服务器主 动将消息推送到消息消费端

5.2 消息消费者初探

下面分析推模式的消费者 MQPushConsume的主要API, 如下图所示。

MQConsume

/**
* 发送消息 ACK确认
* @param msg 消息
* @param delayLevel 消息延迟级别
* @param brokerName 消息服务器名称
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

/**
* 获取消费者对主题 topic分配了哪些消息队列
* @param topic 主题名称
*/
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;

MQPushConsumer

/**
* 注册并发消息事件监昕器
* @param messageListener
*/
void registerMessageListener(final MessageListenerConcurrently messageListener);

/**
* 注册顺序消费事件监听器
* @param messageListener
*/
void registerMessageListener(final MessageListenerOrderly messageListener);

/**
* 基于主题订阅消息
* @param topic 消息主题
* @param subExpression 消息过滤表达式,TAG或SQL92表达式
*/
void subscribe(final String topic, final String subExpression) ;
/**

* 基于主题订阅消息,消息过滤方式使用类模式
* @param topic 消息主题
* @param fullClassName 过滤类全路径名
* @param filterClassSource 过滤类代码
*/
void subscribe(final String topic, final String fullClassName,final String filterClassSource);

/**
* 取消消息订阅 
* @param topic
*/
void unsubscribe(final String topic);

DefaultMQPushConsumer (推模式消息消费者)主要属性:

DefaultMQPushConsumer

//消费者所属组
private String consumerGroup;

//消息消费模式,分为集群模式、广播模式,默认为集群模式
private MessageModel messageModel = MessageModel.CLUSTERING;

//根据消息进度从消息服务器拉取不到消息时重新计算消费策略
//CONSUME_FROM_MIN_OFFSET,从队列当前最小偏移量开始消费
//CONSUME_FROM_MAX_OFFSET,从队列当前最大偏移量开始消费
//CONSUME_FROM_TIMESTAMP,从消费者启动时间戳开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//集群模式下消息队列负载策略 
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
 //集群模式下消息队列负载策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

//订阅信息
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();

/**
* 消息业务监听器
*/
private MessageListener messageListener;

/**
* 消息消费进度存储器
*/
private OffsetStore offsetStore;

/**
* 消息者最新线程数
*/
private int consumeThreadMin = 20;

/**
* 消费者最大线程数,由于消费者线程池使用无界队列,
* 故消费者线程个数其实最多只有 consumeThreadMin 个
*/
private int consumeThreadMax = 20;
/**
* 消费者最大线程数,由于消费者线程池使用无界队列,
* 故消费者线程个数其实最多只有 consumeThreadMin 个
*/
private int consumeThreadMax = 20;

/**
* Threshold for dynamic adjustment of the number of thread pool
*/
private long adjustThreadPoolNumsThreshold = 100000;

/**
* 并发消息消费时处理队列最大跨度,默认 2000,
* 表示如果消息处理队列中偏移量最大的消息与偏移量最小的消息的跨度超过 2000则延迟到毫秒后再拉取消息
*/
private int consumeConcurrentlyMaxSpan = 2000;
//默认值1000, 每1000次流控后打印流控日志
private int pullThresholdForQueue = 1000;

/**
* 推模式下拉取任务间隔时间,默认一次拉取任务完成继续拉取。
*/
private long pullInterval = 0;

/**
* 消息并发消费时一次消费消息条数,通俗点说 就是每次传入MessageListtener#consumeMessage中的消息条数
*/
private int consumeMessageBatchMaxSize = 1;

/**
* 每次消息拉取所拉取的条数,默认32条
*/
private int pullBatchSize = 32;
//是否每次拉取消息都更新订阅信息,默认为 false
private boolean postSubscriptionWhenPull = false;
//最大消费重试次数。如果消息消费次数超过 maxReconsumeTimes还未成功,则将该消息转移到一个失败队列,等待被删除
private int maxReconsumeTimes = -1;
/**
* 延迟将该队列的消息提交到消费者线程的等待时间,默认延迟ls
*/
private long suspendCurrentQueueTimeMillis = 1000;
/**
* 消息消费超时时间,默认为15,单位为分钟 
*/
private long consumeTimeout = 15;

5.3 消费者启动流程

消息消费者是如何启动的,分析 DefaultMQPushConsumerlmpl 的start方法,具体代码如下。

DefaultMQPushConsumelmpl#copySubscription

private void copySubscription() throws MQClientException {
    try {
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }

        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                break;
            case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

Step1 :构建主题订阅信息 SubscriptionData 并加入到 Rebalancelmpl 的订阅消息中。 订阅关系来源主要有两个。
1)通过调用 DefaultMQPushConsumerlmpl#subscrib巳( String topic, String subExpression) 方法。
2)订阅重试主题消息。从这里可以看出,RocketMQ消息重试是以消费组为单位,而不是主题,消息重试主题名为 %RETRY%+消费组名。消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。

DefaultMQPushConsumelmpl#start

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

if (this.pullAPIWrapper == null) {
    this.pullAPIWrapper = new PullAPIWrapper(
        mQClientFactory,
        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
}
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

Step2:初始化 MQClientlnstance、 Rebalancelmple (消息重新负载实现类)等。

DefaultMQPushConsumerlmpl#start

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();

Step3 : 初始化消息进度。如果消息消费是集群模式,那么消息进度保存在 Broker上; 如果是广播模式,那么消息消费进度存储在消费端。

DefaultMQPushConsumerlmpl#start

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService =
    new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

this.consumeMessageService.start();

Step4 :根据是否是顺序消费,创建消费端消费线程服务。 ConsumeMessageService 主要负责消息消费,内部维护一个线程池。

DefaultMQPushConsumerlmpl#start

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}

mQClientFactory.start();

Step5 :向 MQClientlnstance注册消费者,并启动MQClientlnstance,在一个JVM中的所有消费者、生产者持有同一个 MQClientlnstance, MQClientlnstance 只会启动一次。

5.4 消息拉取

一条小咸鱼