RocketMq消息及其应用
一、延迟消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
配置自定义messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 10m 15m 20m 30m 1h 3h 6h 12h 24h
注意,messageDelayLevel是broker的属性,不属于某个topic。
发消息时,设置delayLevel等级即可。level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟24h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
/**
* 重试次数--延迟级别对应关系
*/
static ConcurrentHashMap<Integer,Integer> retryTimeDelayLevelMap = new ConcurrentHashMap<>();
static{
retryTimeDelayLevelMap.put(1,5);//1min
retryTimeDelayLevelMap.put(4,14);//1h
retryTimeDelayLevelMap.put(2,7);//3min
retryTimeDelayLevelMap.put(3,10);//10min
}
//发送延迟消息
public void sendDelayNotify(VideoStateChangeEvent event) {
try{
if(event.getRetryTimes()>4 || event.getRetryTimes()<1){
LOG.warn("仅仅支持重发4次 retryTimes:{}",event.getRetryTimes());
return;
}
org.springframework.messaging.Message mmm = MessageBuilder.withPayload(event).build();
ResultDTO r = rocketMQTemplate.syncSendDelay(mediaCallbackTopic + ":" + event.getEventType(), mmm, retryTimeDelayLevelMap.get(event.getRetryTimes()));
}catch(Exception e){
//dosomething
}
}
二、重试消息
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
/**
* 消费者
*/
@Service
@RocketMQMessageListener(
consumerGroup = "${consumer}",
topic = "${topic}",
selectorExpression = "*")
public class CallbackTopicConsumer implements RocketMQListener<MessageExt>{(CallbackTopicConsumer.class);
@Override
public void onMessage(MessageExt msg) {
try {
byte[] bytes = msg.getBody();
//dosomething
}catch (MediaStorePlatformException e1){
//对于特定异常类型,如果异常被catch后,没有往外抛,client认为消息已经被消费。
//此时,消息会丢弃
//dosomething
}catch (Exception e) {
//dosomething
}
}
}
一条小咸鱼