RocketMq技术内幕笔记(二)
3 消息发送
RocketMQ 发送普通消息有 三 种实现方式:可靠同步发送 、 可靠异步发送 、 单向 (Oneway)发送。
3.1 RocketMQ 消息发送
RocketMQ 支持 3 种消息发送方式 :同步(sync)、 异步(async)、单向(oneway)。
同步 : 发送者向 MQ 执行发送消息 API 时,同步等待, 直到消息服务器返回发送结果 。
异步 : 发送者向 MQ 执行发送消息 API 时,调用消息发送Api后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
单向:消息发送者向 MQ 执行发送消息 API时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上 。
3.2 认识RocketMQ消息
RocketMQ 消息封装类是 org.apache.rocketmq.common.message.Message。
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
}
Message 扩展属性主要包含下面几个 。
tag:消息TAG,用于消息过滤 。
keys: Message索引键,多个用空格隔开,RocketMQ可以根据这些 key快速检索到消息 。 waitStoreMsgOK:消息发送时是否等消息存储完成后再返回 。
delayTimeLevel: 消息延迟级别,用于定时消息或消息重试 。
3.3 生产者启动流程
3.3.1 初识 DefaultMQProducer 消息发送者
DefaultMQProducer是默认的消息生产者实现类,它实现 MQAdmin 的接口。其主要接口有:
/**
* @param key accesskey
* @param newTopic 主题名称
* @param queueNum 队列数量
* @param topicSysFlag 主题系统标签,默认为0
* @param attributes
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
/**
* 根据 时间 戳从队列中查找其偏移量
*/
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQProducerImpl.searchOffset(queueWithNamespace(mq), timestamp);
}
/**
* 查找该消息 队列中 最大的物理偏移量
*/
@Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.maxOffset(queueWithNamespace(mq));
}
/**
* 查找该消息队列中最小物理偏移量
*/
@Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.minOffset(queueWithNamespace(mq));
}
/**
* 根据消息偏移量查找消息
*/
@Deprecated
@Override
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
}
/**
* 根据条件查询消息
* @param topic message topic
* @param key message key index word 消息索引字段
* @param maxNum max message number 本次最多取出消息条数。
* @param begin from when 开始时间
* @param end to when 结束时间
*/
@Deprecated
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQProducerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
}
/**
* 根据主题与消息 ID 查找消息
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@Override
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
return this.viewMessage(msgId);
} catch (Exception ignored) {
}
return this.defaultMQProducerImpl.queryMessageByUniqKey(withNamespace(topic), msgId);
}
/**
* 查找该主题下所有的消息队列
*/
@Override
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
return this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic));
}
/**
* 同步发送消息,具体发送到主题中的哪个消息队列由负载算法决定
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
/**
* 同步发送消息,如果发送超过 timeout 则抛出超时异常
*/
@Override
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, timeout);
}
/**
* 异步发送消息, sendCallback参数是消息发送成功后的回调方法
*/
@Override
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback);
}
/**
* 异步发送消息 ,如果发送超过 timeout指定的值,则抛出超时异常
*/
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
/**
* 单向消息发送,就是不在乎发送结果,消息发送出去后该方法立即返回
*/
@Override
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneway(msg);
}
/**
* 同步方式发送消息,发送到指定消息队列
*/
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
}
/**
* 同步方式发送消息,发送到指定消息队列 超时异常
*/
@Override
public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), timeout);
}
/**
* 异步方式发送消息,发送到指定消息 队列
*/
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback);
}
/**
* 异步方式发送消息,发送到指定消息队列 超时异常
*/
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback, timeout);
}
/**
* 单向方式发送消息,发送到指定的消息队列
*/
@Override
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneway(msg, queueWithNamespace(mq));
}
/**
* 消息发送,指定消息选择算法,覆盖消息生产者默认的消息队列负载
*/
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
/**
* 同步批量消息发送
*/
@Override
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}
DefaultMQProducer核心属性
/**
* 生产者所属组,消息服务器在回查事务状态时会随机选择该组中任何一个生产者发起事务回查请求
*/
private String producerGroup;
/**
* 默认 topicKey。
*/
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
/**
* 默认主题在每一个 Broker 队列数量。
*/
private volatile int defaultTopicQueueNums = 4;
/**
* 发送消息默认超时时间, 默认 3s。
*/
private int sendMsgTimeout = 3000;
/**
* 消息体超过该值则启用压缩,默认 4K。
*/
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
*
* 同 步方式发送消息重试次数,默认为 2,总共执行 3 次 。
*/
private int retryTimesWhenSendFailed = 2;
/**
* 异步方式发送消息重试次数,默认为 2。
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* 消息重试时选择另外一个 Broker时是否不等 待存储结果就返回 , 默认为 false。
*/
private boolean retryAnotherBrokerWhenNotStoreOK = false;
/**
* 允许发送的最大消息长度,默认为 4M,眩值最大值为 2"32-1
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M
3.3.2 消息生产者启动流程
消息生产者启动主要从DefaultMQProducerlmpl的start方法开始。
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//Step1:检查productGroup是否符合要求;并改变生产者的 instanceName为进程 ID。
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// /Step2 :创建 MQClientInstance实例。 整个JVM 实例中只存在一个 MQClientManager实例,维护一个MQClientInstance缓存表 ConcurrentMap<String/*clientId灯,MQClientInstance> factoryTable =new ConcurrentHashMap<String, MQClientInstance>(),也就是 同一个 clientId只 会创建一个 MQClientInstance。
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//Step3 :向 MQClientlnstance注册,将当前生产者加入到 MQClientlnstance管理中,方 便后续调用网络请求、进行心跳检测等。
//Step4 : 启动 MQClientlnstance,如果 MQC!ientlnstance 已经启动 ,则本次启 动不会真 正执行。
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
RequestFutureHolder.getInstance().startScheduledTask(this);
}
MQClientManager
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
3.4 消息发送基本流程
消息发送流程主要的步骤:验证消息、查找路由 、 消息发送 (包含异常处理机制) 。
同步消息发送入口
//DefaultMQProducer
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
//DefaultMQProducerImpl
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
3.4.1 消息长度验证
消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体的规范要求是主题名称、消息体不能为空、消息长度不能等于 0且默认不能超过允许 发送消息的最大长度 4M (maxMessageSize=1024 *1024 *4)。
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
3.4.2 查找主题路由信息
消息发送之前,首先需要获取主题的路由信息,确认发送到具体的 Broker节点。。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
tryToFindTopicPublishlnfo是查找主题的路由信息的方法。如果生产者中缓存了 topic 的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存或没有包含消息队列, 则向 NameServer查询该topic的路由信息。如果最终未找到路由信息,则抛出异常 : 无法找到主题相关路由信息异常。
TopicPublishInfo
public class TopicPublishInfo {
private boolean orderTopic = false; //是否是顺序消息
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); //该主题队列的消息队列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 每选择一次消息 队列, 该值会自增 l,如果 Integer.MAX_VALUE,则重置为 0,用于选择消息队列。
private TopicRouteData topicRouteData;
}
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas; //topic 队列元数据 。
private List<BrokerData> brokerDatas; //topic 分布的 broker元数据
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; //broker 上过滤服务器地址列表。
//It could be null or empty
private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}
第一次发送消息时,本地没有缓存 topic 的路由信息,查询NameServer尝试获取,如果路由信息未找到,再次尝试用默认主题 DefaultMQProducerlmpl#createTopicKey去查询,如果 BrokerConfig#autoCreateTopicEnable为true时,NameServer将返回路由信息,如果 autoCreateTopicEnable为false将抛出无法找到topic路由异常。
updateTopicRouteInfoFromNameServer
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
//获取锁
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
//Step 1 :如果isDefault为true,则使用默认主题去查询,如果查询到路由信息,
// 则替换路由信息中读写队列个数为消息生产者默认的队列个数(defaultTopicQueueNums);如果
// isDefault为false,则使用参数 topic去查询;如果未查询到路由信息,则返回false,表示路由信息未变化
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
//Step2:如果路由信息找到,与本地缓存中的路由信息进行对比,判断路由信息是否发生了改变,如果未发生变化,则直接返回false。
boolean changed = topicRouteData.topicRouteDataChanged(old);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
//Step3:更新 MQClientInstanceBroker地址缓存表。
if (changed) {
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update endpoint map
{
ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
if (!mqEndPoints.isEmpty()) {
topicEndPointsTable.put(topic, mqEndPoints);
}
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// step 4:根据 topicRouteData中的 List<QueueData>转换成topicPublishInfo的 List<MessageQueue>
//列表。其具体实现在topicRouteData2TopicPublishInfo, 然后会更新该 MQClientInstance所管辖的所有消息发送关于topic的路由信息
if (!consumerTable.isEmpty()) {
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
循环遍历路由信息的QueueData 信息,如果队列没有写权限,则继续遍历下一个QueueData ,根据 topic+序号创建 MessageQueue,填充 topicPublishlnfo的List
topicRouteData2TopicSubscribeInfo
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<>();
if (route.getTopicQueueMappingByBroker() != null
&& !route.getTopicQueueMappingByBroker().isEmpty()) {
ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
return mqEndPoints.keySet();
}
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
3.4.3 选择消息队列
首先消息发送端采用重试机制 ,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送结构后执行回调之前进行重试。由retryTimesWhenSend-AsyncFailed指定,接下来就是循环执行,选择消息队列、发送消息,发送成功则返回,收到异常则重试。选择消息队列有两种方式。
1 ) sendLatencyFaultEnable=false,默认不启用Broker故障延迟机制。
2 ) sendLatencyFaultEnable=true,启用Broker故障延迟机制。
1. 默认机制
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判断启用不启用Broker故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//验证该消息队列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
//不启用Broker故障延迟机制
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
TopicPublishInfo 启用Broker故障延迟机制
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。第一次执行消息队列选择时,lastBrokerName为null,此时直接用 sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue(selectOneMessageQueue()方法),如果消息发送再失败的话,下次进行消息队列选择时规避上次 MesageQueue所在的Broker,否则还是很有可能再次失败。
该算法在一次消息发送过程中能成功规避故障的Broker,但如果 Broker若机,由于路 由算法中的消息队列是按 Broker排序的,如果上一次根据路由算法选择的是若机的Broker的第一个队列,那么随后的下次选择的是若机Broker的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗。
Broker不可用后,路由信息中为什么还会包含该 Broker的路由信息呢?其实这不难解释:首先, NameServer 检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(10s); 其次,NameServer不会 检测到 Broker岩机后马上推送消息给消息生产者,而是消息生产者每隔 30s更新一次路由信息,所以消息生产者最快感知Broker最新的路由信息也需要30s。如果能引人一种机制,在 Broker若机期间,如果一次消息发送失败后,可以将该 Broker暂时排除在消息队列的选择范围中。
2. Broker故障延迟机制
代码如上
1 )根据对消息队列进行轮询获取一个消息队列 。
2)验证该消息队列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName())
3)如果返回的 MessageQueue可用, 移除latencyFaultTolerance关于该topic条目, 表
明该Broker故障已经恢复。
Broker故障延迟机制核心类-LatencyFaultTolerance
public interface LatencyFaultTolerance<T> {
/**
* 更新失败条目
* @param name brokerName
* @param currentLatency 消息发送故障延迟时间
* @param notAvailableDuration 不可用持续时辰,在这个时间内Broker将被规避
*/
void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
/**
* 判断 Broker是否可用
* @param name
* @return
*/
boolean isAvailable(final T name);
/**
* 移除Fault条目,意味着Broker重新参与路由计算
* @param name
*/
void remove(final T name);
/**
* 尝试从规避的Broker中选择一个可用的Broker,如果没有找到,将返回null
* @return
*/
T pickOneAtLeast();
}
Faultltem: 失败条目(规避规则条目)
class FaultItem implements Comparable<FaultItem> {
//条目唯一键,这里为brokerName
private final String name;
//本次消息发送延迟
private volatile long currentLatency;
//故障规避开始时间
private volatile long startTimestamp;
}
MQFaultStrategy:消息失败策略,延迟实现的门面类
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
latencyMax根据currentLatency本次消息发送延迟,从latencyMax尾部向前找到
第一个比currentLatency小的索引index,如果没有找到,返回0。然后根据这个索引从 notAvailableDuration数组中取出对应的时间,在这个时长内,Broker将设置为不可用。
MQFaultStrategy#updateFaultltem
/**
* 更新失败条目
* @param brokerName
* @param currentLatency 本次消息发送延迟时间
* @param isolation isolation,是否隔离,该参数的含义如果为 true,则使用默认时长30s来
* 计算Broker故障规避时长,如果为false,则使用本次消息发送延迟时间来计算Broker故障规避时长。
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
computeNotAvailableDuration的作用是计算因本次消息发送故障需要将 Broker 规避的时长,也就是接下来多久的时间内该 Broker将不参与消息发送队列负载。具体算法:从 latencyMax数组尾部开始寻找,找到第一个比currentLatency小的下标, 然后从notAvailableDuration数组中获取需要规避的时长,该方法最终调用LatencyFaultTolerance的updateFaultltem。
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
根据 broker名称从缓存表中获取Faultitem,如果找到则更新Faultltem,否则创建Faultltem。这里有两个关键点。
1)currentLatency、startTimeStamp被volatile修饰。
2)startTimeStamp为当前系统时间加上需要规避的时长。startTimeStamp是判断broker当前是否可用的直接一句,请看 Faultltem#isAvailable方法。
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
3.4.4 消息发送
/**
* 发送消息
* @param msg 待发送消息
* @param mq 消息将发送到该消息队列上
* @param communicationMode 消息发送模式
* @param sendCallback 异步消息回调函数
* @param topicPublishInfo 主题路由信息
* @param timeout 消息发送超时时间
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)
Step 1:根据MessageQueue获取Broker的网络地址。如果MQClientlnstance的brokerAddrTable禾缓存该Broker的信息,则从NameServer主动更新一下topic的路由信息。如果路由更新后还是找不到 Broker信息,则抛出MQClientException,提示Broker不存在。
DefaultMQProducelmpl#sendKernellmpl
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
Step2:为消息分配全局唯一ID ,如果消息体默认超过 4K(compressMsgBodyOverHowmuch), 会对消息体采用 zip压缩,并设置消息的系统标记为 MessageSysFlag.COMPRESSED_FLAG。 如果是事务 Prepared消息,则设消息的系统标记为MessageSysFlag.TRANSACTION_ PREPARED_TYPE。
DefaultMQProducelmpl#sendKernellmpl
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {//压缩消息
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
Step3 :如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。通过DefaultMQProducerlmpl#registerSendMessageHook注册钩子处理类,并且可以注册多个。
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
Step4 :构建消息发送请求包。 主要包含如下重要信息:生产者组、主题名称、默认创建主题 Key、该主题在单个Broker默认队列数 、队列ID (队列序号)、消息系统标记( MessageSysFlag)、消息发送时间、消息标记(RocketMQ对消息中的 flag不做任何处理,供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等。
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
Step5:根据消息发送方式,同步、异步、单向方式进行网络传输。
MQClientAPIImpl#sendMessage
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
Step6:如果注册了消息发送钩子函数,执行 after逻辑。 注意,就算消息发送过程中发
生 RemotingException、 MQBrokerException、 InterruptedException时该方法也会执行。
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
1. 同步发送
MQ客户端发送消息的入口是MQClientAPIImpl#sendMessage。请求命令是Request
Code.SEND_MESSAGE,我们可以找到该命令的处理类: org.apache.rocketmq.broker.processor. SendMessageProcessor。入口方法在 SendMessageProcessor#sendMessage。
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader,
final TopicQueueMappingContext mappingContext,
final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
//发送消息并且进行消息审查
final RemotingCommand response = preSend(ctx, request, requestHeader);
if (response.getCode() != -1) {
return response;
}
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
// Step2:如果消息重试次数超过允许的最大重试次数,消息将进入到 DLD 延迟队列
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
return response;
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
String uniqKey = oriProps.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqKey == null || uniqKey.length() <= 0) {
uniqKey = MessageClientIDSetter.createUniqID();
oriProps.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, uniqKey);
}
MessageAccessor.setProperties(msgInner, oriProps);
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
sendTransactionPrepareMessage = true;
}
long beginTimeMillis = this.brokerController.getMessageStore().now();
if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {
asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
final int finalQueueIdInt = queueIdInt;
final MessageExtBrokerInner finalMsgInner = msgInner;
asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
RemotingCommand responseFuture =
handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
if (responseFuture != null) {
doResponse(ctx, request, responseFuture);
}
sendMessageCallback.onComplete(sendMessageContext, response);
}, this.brokerController.getPutMessageFutureExecutor());
// Returns null to release the send message thread
return null;
} else {
PutMessageResult putMessageResult = null;
if (sendTransactionPrepareMessage) {
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
//调用 DefaultMessageStore#putMessage 进行消息 存储
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
}
msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand request,
final RemotingCommand response) {
//1 )检查该Broker是否有写权限
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
//检查该Topic是否可以进行消息发送。主要针对默认主题,默认主题不能发送消息,仅仅供路由查找
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response;
}
if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
return response;
}
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
}
}
LOGGER.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
//在 NameServer端存储主题的配置信息,
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
//4)检查队列,如果队列不合法,返回错误码 。
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
LOGGER.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
handleRetryAndDLQ
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal() && requestHeader.getMaxReconsumeTimes() != null) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
// Using '>' instead of '>=' to compatible with the case that reconsumeTimes here are increased by client.
// Step2:如果消息重试次数超过允许的最大重试次数,消息将进入到 DLD 延迟队列。延迟队列主题: %DLQ%+消费组名
if (reconsumeTimes > maxReconsumeTimes) {
properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0
);
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
msg.setDelayTimeLevel(0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
msg.setSysFlag(sysFlag);
return true;
}
2.异步发送
消息异步发送是指消息生产者调用发送的 API后,无须阻塞等待消息服务器返回本次消息发送结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果回调。 异步方 式相比同步方式,消息发送端的发送性能会显著提高,但为了保护消息服务器的负载压力, RocketMQ 对消息 发送的异步消息进行了井发控制,通过参数clientAsyncSemaphoreValue来控制,默认为65535。异步消息发送虽然也可以通过 DefaultMQProducer#retryTimesWhenSendAsyncFailed 属性来控制消息重试次数,但是重试的调用人 口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。
3. 单向发送
单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送结果,并且无须提供回调函数,表示消息发送压根就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做而已,并且没有重试机制。
3.5 批量消息发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率 。
当然,并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过 DefaultMQProducer#maxMessageSize。
RemotingCommand
//请求命令编码,请求命令类型
private int code;
private LanguageCode language = LanguageCode.JAVA;
//版本号
private int version = 0;
//客户端请求序号
private int opaque = requestId.getAndIncrement();
//标记。倒数第一位表示请求类型,O:请求; 1:返回。倒数第二位,1:表示oneway
private int flag = 0;
//描述
private String remark;
//扩展属性
private HashMap<String, String> extFields;
//每个请求对应 的请求头信息
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
//请求体
private transient byte[] body;
单条消息发送时,消息体的内容将保存在body中。 批量消息发送 ,需要将多条消息体的内容存储在body中。
RocketMQ采取的方式是,对单条消息内容使用固定格式进行存储。

DefaultMQProducer#send 消息批量发迭
@Override
public SendResult send(
Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
首先在消息发送端,调用batch方法,将一批消息封装成MessageBatch对象。MessageBatch继承自Message对象,MessageBatch内部持有List
在创建RemotingCommand对象时将调用messageBatch#encode()方法填充到Remoting-Command的body域中。
public static byte[] encodeMessages(List<Message> messages) {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message : messages) {
byte[] tmp = encodeMessage(message);
encodedMessages.add(tmp);
allSize += tmp.length;
}
byte[] allBytes = new byte[allSize];
int pos = 0;
for (byte[] bytes : encodedMessages) {
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}
return allBytes;
}
public static byte[] encodeMessage(Message message) {
//only need flag, body, properties
byte[] body = message.getBody();
int bodyLen = body.length;
String properties = messageProperties2String(message.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
//note properties length must not more than Short.MAX
short propertiesLength = (short) propertiesBytes.length;
int sysFlag = message.getFlag();
int storeSize = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCOD
+ 4 // 3 BODYCRC
+ 4 // 4 FLAG
+ 4 + bodyLen // 4 BODY
+ 2 + propertiesLength;
ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);
// 2 MAGICCODE
byteBuffer.putInt(0);
// 3 BODYCRC
byteBuffer.putInt(0);
// 4 FLAG
int flag = message.getFlag();
byteBuffer.putInt(flag);
// 5 BODY
byteBuffer.putInt(bodyLen);
byteBuffer.put(body);
// 6 properties
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);
return byteBuffer.array();
}
一条小咸鱼