RocketMq技术内幕笔记(一)
1 大纲
1.1 RocketMQ 原代码的目录结构
RocketMQ 核心目 录说明如下 。
- broker: broker模块(broker启动进程) 。
- client:消息客户端,包含消息生产者、消息消费者相关类。
- common:公共包。
- dev:开发者信息(非源代码)。
- distribution:部署实例文件夹(非源代码)。
- example: RocketMQ 示例代码 。
- filter:消息过滤相关基础类。
- filtersrv: 消息过滤服务器实现相关类(Filter启动进程)。
- logappender:日志实现相关类。
- namesrv : NameServer 实现相关类(Names巳rver启动进程) 。
- openmessaging: 消息开放标准,正在制定中 。
- remoting: 远程通信模块,基于 Netty。
- srvutil:服务器工具类。
- store:消息存储实现相关类 。
- style: checkstyle相关实现。
- test: 测试相关类。
- tools: 工具类,监控命令相关实现类。
1.2 RocketMQ 的设计理念和目标
1.2.1 设计理念
RocketMQ 设计基于主题的发布与 订阅 模式 , (Broker)、消息消费。
NameServer:实现元数据的管理(Topic路由信息等),因为 Topic 路由信息无须在集群之 间保持强一致,追求最终一致性,并且能容 忍分钟级的 不一致 。
高效的IO存储机制:RocketMQ追求消息发送的高吞吐量, RocketMQ 的消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引人内存 l映射机制,所 有主 题的消息存储基于顺序写,极大地提高了消息写性能, 同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。
1.2.2 设计能力
-
架构模式
RocketMQ 与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括 :
消息发送者、消息服务器(消息存储)、消息消费、路由发现 。 -
顺序消息
所谓顺序消息,就是消息消费者按照消息达到消息存储服务器的顺序消费 。 RocketMQ 可以严格保证消息有序 。 -
消息过滤
RocketMQ 消息过滤支持在服务端与消费端的消息过滤机制 。
1 )消息在 Broker 端过滤。Broker只将消息消费者感兴趣的消息发送给消息消费者 。
2 )消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从 Broker传输到消费端。 -
消息存储
RocketMQ 追求消息存储的高性能,引人内存映射机制,所有主题的消息顺序存储在同一个文件中 。 同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制。 -
消息高可用性
通常影响消息可靠性的有以下几种情况 。
- Broker正常关机。
- Broker异常 Crash。
- OS Crash。
- 机器断电,但 是 能立即恢复供电情况 。
- 机器无法开机(可能是 CPU、主板、 内存等关键设备损 坏)。
- 磁盘设备损坏。
情况 1~4 的 RocketMQ 在同步刷盘机制下可以确保不丢失消息,在异步刷盘模式下,会丢失少量消息 。 情况 5-6 属于单点故障,一旦发生,该节点上的消息全 部丢失,如果开启了异步复制机制, RoketMQ 能保证只丢失少量消息。
-
消息到达 (消费)低延迟
RocketMQ在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。 -
确保消息必须被消费一次
RocketMQ 通过消息消费确认机制(ACK)来确保消息至少被消费一次,但由于ACK消息有可能丢失等其他原因,RocketMQ无法做到消息只被消费一次,有重复消费的可能。 -
回溯消息
回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。 RocketMQ 支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。 -
消息堆积
RocketMQ 消息存储使用磁盘文件 (内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。 RocketMQ消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留3天。 -
定时消息
定 时消息 是指消息发送到 Broker 后, 不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。 如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故RocketMQ不支持任意精度的定时消息,而只支持特定延迟级别。 -
消息重试机制
消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ支持消息重试机制。
2 RocketMQ路由中心NameServer
2.1 NameServer 架构设计

Broker消息服务器在启动时向所有 NameServer注册,消息生产者(Producer)在发送消 息之前先从 NameServer获取Broker 服务器地址列表,然后根据负载算法从列表中选择一 台消息服务器进行消息发送。NameServer与每台 Broker 服务器保持长连接,并间隔30s检测Broker是否存活,如果检测到 Broker右机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者。
NameServer本身的高可用可通过部 署多台 NameServer服务器来实现,但彼此之间互不通信,也就是 NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消 息发送不会造成任何影响。
2.2 NameServer 启动流程
NameServer启动类 : org.apache.rocketmq.namesrv.NamesrvStartup。
- Step 1: 首先来解析配置文件,需要填充 NameServerConfig、NettyServerConfig属性值。
public static void parseCommandlineAndConfigFile(String[] args) throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return;
}
namesrvConfig = new NamesrvConfig();
nettyServerConfig = new NettyServerConfig();
nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(9876);
controllerConfig = new ControllerConfig();
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, controllerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
MixAll.printObjectProperties(null, nettyClientConfig);
MixAll.printObjectProperties(null, controllerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
}
创建 NameServerConfig ( NameServer业务参数)、NettyServer-Config ( NameServer网络参数),然后在解析启动时把指定的配置文件或启动命令中的选项 值,填充到 nameServerConfig,nettyServerConfig对象。
public class NamesrvConfig {
//rocketmq 主目录,可以通过 -Drocketmq.home.dir=path或通过设置环境变量 ROCKETMQ_HOME 来配置 RocketMQ 的主目录 。
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//NameServer存储 KV 配置属性 的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//NameServer 默认配置文件路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
//是否支持顺序消息,默认是不支持
private boolean orderMessageEnable = false;
private boolean returnOrderTopicConfigToBroker = true;
}
public class NettyServerConfig implements Cloneable {
//NameServer监昕端口,该值默认会被初始化为 9876
private int listenPort = 0;
//Netty业务线程池线程个数。
private int serverWorkerThreads = 8;
//Netty public任务线程池线程个数,Netty网络设计,
//根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等 。
//如果该业务类型(RequestCode)未注册线程池, 则由 public线程池执行。
private int serverCallbackExecutorThreads = 0;
//IO线程池线程个数,主要是 NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的解析请求包,然后转发到
//各个业务线程池完成具体的业务操作,然后将结果再返回调用方 。
private int serverSelectorThreads = 3;
//send oneway 消息请求井发度
private int serverOnewaySemaphoreValue = 256;
//异步消息发送最大并发度
private int serverAsyncSemaphoreValue = 64;
//网络连接最大空闲时间,默认120s。 如果连接空闲时间超过该参数设置的值,连接将被关闭。
private int serverChannelMaxIdleTimeSeconds = 120;
//网络 socket发送缓存区大小, 默认 64k
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
//网络 socket接收缓存区大小 ,默认 64k
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
//ByteBuffer是否开启缓存 , 建议开启
private boolean serverPooledByteBufAllocatorEnable = true;
/**
* make install
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
//是否启用EpollIO模型, Linux环境建议开启。
private boolean useEpollNativeSelector = false;
}
️启动 NameServer时,可以先使用/mqnameserver-c configFile -p 打印当前加载的配置属性
- Step2:根据启动属性创建 NamesrvController实例,并初始化该实例,实例为NameServer核心控制器。
public boolean initialize() {
//加载kvConfigPath下kvConfig.json配置文件里的KV配置,然后将这些配置放到KVConfigManager#configTable属性中
loadConfig();
//根据nettyServerConfig初始化一个netty服务器。
initiateNetworkComponents();
//初始化负责处理Netty网络交互数据的线程池,默认线程数是16个
initiateThreadExecutors();
////注册Netty服务端业务处理逻辑,如果开启了clusterTest,那么注册的请求处理类是ClusterTestRequestProcessor,否则请求处理类是DefaultRequestProcessor
registerProcessor();
//注册心跳机制线程池,延迟5毫秒启动,每隔5秒遍历RouteInfoManager#brokerLiveTable这个属性,用来扫描不存活的broker
//注册打印KV配置线程池,延迟1分钟启动、每10分钟打印出kvConfig配置
startScheduleService();
initiateSslContext();
initiateRpcHooks();
return true;
}
- Step3 :注册JVM钩子函数并启动服务器,以便监昕 Broker、消息生产者的网络请求 。
//在 JVM 进程关闭之前,先将线程池关闭,及时释放资源 。
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null;
}));
controller.start();
2.3 NameServer 路由注册、故障剔除
NameServer主要作用是为消息生产者和消息消费者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基础信息,还要能够管理Broker节点,包括路由注册、 路由删除等功能。
2.3.1 路由元信息
NameServer路由实现类: org.apache.rocketmq.namesrv.routeinfo.RoutelnfoManager
RoutelnfoManage 路由元数据
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//Topic 消息队列路由信息,消息发送时根据路由表进行负 载均衡 。
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
//Broker 基础信息, 包含 brokerName、 所属集群名称 、 主备 Broker地址。
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
//Broker 集群信息,存储集群中所有 Broker 名称 。
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//Broker 状态信息 。 NameServer 每次 收到心跳包时会 替换该信 息
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker上的 FilterServer列表,用于类模式消息过滤
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
private final BatchUnregistrationService unRegisterService;
private final NamesrvController namesrvController;
private final NamesrvConfig namesrvConfig;
}
RocketMQ基于订阅发布机制,一个Topic拥有多个消息队列 ,一个Broker为每一主题默认创建4个读队列4个写队列。多个Broker组成一个集群,BrokerName由相同的多台Broker组成Master-Slave架构 , brokerId为0代表 Master,大于0表示Slave。 BrokerLivelnfo中的lastUpdateTimestamp 存储上次收到Broker心跳包的时间。
2.3.2 路由注册
RocketMQ路由注册是通过 Broker与NameServer的心跳功能实现的。Broker启动时 向 集群中 所有的 NameServer发送心跳语句,每隔30s向 集群 中所 有 NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLivelnfo的lastUpdateTimestamp,然后NameServer每隔10s扫描 brokerLiveTable,如果连续120s没有收到心跳包, NameServer将移除该 Broker的路由信息同时关闭Socket连接。
- Broker发送心跳包
Broker端心跳包发送
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run2() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
return;
}
if (isIsolated) {
BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
registerBrokerAll
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
@Override
public void run2() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
} catch (Exception e) {
LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
}
} catch (InterruptedException ignore) {
}
该方法主要是遍历 NameServer列表,Broker消息服务器依次向 NameServer发送心跳包。
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//....
}
发送心跳包具体逻辑,首先封装请求包头( Header)。
brokerAddr: broker 地址 。
brokerId: brokerld,0:Master;大 0: Slave。
brokerName:broker名称。
clusterName: 集群名称。
haServerAddr: master 地址,初次请求时该值为空,slave 向Nameserver注册后返回。
requestBody:
filterServerList:消息过滤服务器列表。
topicConfigWrapper:主题配置。
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setEnableActingMaster(enableActingMaster);
requestHeader.setCompressed(false);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
requestBody.setFilterServerList(filterServerList);
RocketMQ网络传输基于 Netty,每一个请求,RocketMQ都会定义一个RequestCode,然后在服务端会对应相应的网络处理器 (processor包中)。
- NameServer处理心跳包
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 网络处理器解析请求类型, 如果请求类型为RequestCode.REGISTER_BROKER,则请求最终转发到RoutelnfoMan ager#registerBroker。
this.lock.writeLock().lockInterruptibly();
//init or update the cluster info
Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
- Step1:路由注册需要加写锁,防止并发修改RoutelnfoManager中的路由表。
// 是否第一个注册
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
brokerData.setZoneName(zoneName);
//省略
String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
Step2 :维护BrokerData信息,首先从brokerAddrTable根据 BrokerName尝试获取 Broker信息,如果不存在,则新建BrokerData并放入到brokerAddrTable, registerFirst设置为 true;如果存在,直接替换原先的,registerFirst设置为false,表示非第一次注册。
boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
&& brokerId == Collections.min(brokerAddrsMap.keySet());
if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(), brokerName,
entry.getValue().getTopicName())) {
final TopicConfig topicConfig = entry.getValue();
if (isPrimeSlave) {
// Wipe write perm for prime slave
topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
}
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}
}
}
- Step3 :如果Broker为Master,并且BrokerTopic配置信息发生变化或者是初次注册,则需要创建或更新 Topic路由元数据,填充topicQueueTable,其实就是为默认主题自动注 册路由信息其中包含 MixAII.DEFAULT_TOPIC的路由信息。当消息生产者发送主题时,如果该主题未创建并且BrokerConfig的autoCreateTopicEnable为true时,将返回MixAII. DEFAULT_TOPIC的路由信息。
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataMap) {
queueDataMap = new HashMap<>();
queueDataMap.put(brokerName, queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
final QueueData existedQD = queueDataMap.get(brokerName);
if (existedQD == null) {
queueDataMap.put(brokerName, queueData);
} else if (!existedQD.equals(queueData)) {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), existedQD,
queueData);
queueDataMap.put(brokerName, queueData);
}
}
}
根据 TopicConfig创建 QueueData数据结构 ,然后更新 topicQueueTable。
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
new BrokerLiveInfo(
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
}
- Step4: 更新BrokerLivelnfo,存活Broker信息表, BrokeLivelnfo是执行路由删除的重要依据。
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddrInfo);
} else {
this.filterServerTable.put(brokerAddrInfo, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
if (masterLiveInfo != null) {
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
- Step5 : 注册Broker的过滤器Server地址列表,一个Broker上会关联多个FilterServer消息过滤服务器;如果此Broker为从节点,则需要查找该Broker的Master 的节点信息,并更新对应的masterAddr属性 。
NameServe与Broker保持长连接, Broker状态存储在 brokerLiveTable中,NameServer每收到一个心跳包,将更新 brokerLiveTable中关于Broker的状态信息以及路 由表( topicQueueTable、 brokerAddrTable、brokerLiveTable、filterServerTable)。 更新上述 路由表( HashTable)使用了锁粒度较少的读写锁,允许多个消息发送者( Producer)并发读,保证消息发送时的高并发。但同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。
2.3.3 路由删除
NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过 120s,则认为Broker失效,移除该 Broker, 关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocktMQ有两个触发点来触发路由删除。
- NameServer定时扫描 brokerLiveTable检测上次心跳包与当前系统时间的时间差,如果时间戳大于 120s则需要移除该 Broker信息 。
2 ) Broker在正常被关闭的情况下会执行unregisterBroker指令。
public void scanNotActiveBroker() {
try {
log.info("start scanNotActiveBroker");
for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {
long last = next.getValue().getLastUpdateTimestamp();
long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
if ((last + timeoutMillis) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
this.onChannelDestroy(next.getKey());
}
}
} catch (Exception e) {
log.error("scanNotActiveBroker exception", e);
}
}
public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {
UnRegisterBrokerRequestHeader unRegisterRequest = new UnRegisterBrokerRequestHeader();
boolean needUnRegister = false;
if (brokerAddrInfo != null) {
try {
try {
this.lock.readLock().lockInterruptibly();
needUnRegister = setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (needUnRegister) {
boolean result = this.submitUnRegisterBrokerRequest(unRegisterRequest);
log.info("the broker's channel destroyed, submit the unregister request at once, " +
"broker info: {}, submit result: {}", unRegisterRequest, result);
}
}
Step1 :申请写锁,把需要移除的broker添加到阻塞队列。
public void unRegisterBroker(Set<UnRegisterBrokerRequestHeader> unRegisterRequests) {
try {
try {
Set<String> removedBroker = new HashSet<>();
Set<String> reducedBroker = new HashSet<>();
Map<String, BrokerStatusChangeInfo> needNotifyBrokerMap = new HashMap<>();
this.lock.writeLock().lockInterruptibly();
for (final UnRegisterBrokerRequestHeader unRegisterRequest : unRegisterRequests) {
final String brokerName = unRegisterRequest.getBrokerName();
final String clusterName = unRegisterRequest.getClusterName();
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, unRegisterRequest.getBrokerAddr());
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddrInfo);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
brokerLiveInfo != null ? "OK" : "Failed",
brokerAddrInfo
);
this.filterServerTable.remove(brokerAddrInfo);
boolean removeBrokerName = false;
boolean isMinBrokerIdChanged = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
if (!brokerData.getBrokerAddrs().isEmpty() &&
unRegisterRequest.getBrokerId().equals(Collections.min(brokerData.getBrokerAddrs().keySet()))) {
isMinBrokerIdChanged = true;
}
String addr = brokerData.getBrokerAddrs().remove(unRegisterRequest.getBrokerId());
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
addr != null ? "OK" : "Failed",
brokerAddrInfo
);
if (brokerData.getBrokerAddrs().isEmpty()) {
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);
removeBrokerName = true;
} else if (isMinBrokerIdChanged) {
needNotifyBrokerMap.put(brokerName, new BrokerStatusChangeInfo(
brokerData.getBrokerAddrs(), addr, null));
}
}
if (removeBrokerName) {
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
clusterName
);
}
}
removedBroker.add(brokerName);
} else {
reducedBroker.add(brokerName);
}
}
cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(needNotifyBrokerMap);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("unregisterBroker Exception", e);
}
}
Step2 :统一删除每个map中元数据。
private void cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) {
Iterator<Entry<String, Map<String, QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
while (itMap.hasNext()) {
Entry<String, Map<String, QueueData>> entry = itMap.next();
String topic = entry.getKey();
Map<String, QueueData> queueDataMap = entry.getValue();
for (final String brokerName : removedBroker) {
final QueueData removedQD = queueDataMap.remove(brokerName);
if (removedQD != null) {
log.debug("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD);
}
}
if (queueDataMap.isEmpty()) {
log.debug("removeTopicByBrokerName, remove the topic all queue {}", topic);
itMap.remove();
}
for (final String brokerName : reducedBroker) {
final QueueData queueData = queueDataMap.get(brokerName);
if (queueData != null) {
if (this.brokerAddrTable.get(brokerName).isEnableActingMaster()) {
// Master has been unregistered, wipe the write perm
if (isNoMasterExists(brokerName)) {
queueData.setPerm(queueData.getPerm() & (~PermName.PERM_WRITE));
}
}
}
}
}
}
Step3 :删除Topic队列中元数据。
2.3.4 路由发现
RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。根据主题名称拉取路由信息的命令编码为: GET_ROUTEINTO_BY_TOPIC。
public class TopicRouteData extends RemotingSerializable {
//顺序消息配置内容,来自于 kvConfig。
private String orderTopicConf;
//topic 队列元数据
private List<QueueData> queueDatas;
//topic分布的 broker元数据
private List<BrokerData> brokerDatas;
//broker上过滤服务器地址列表。
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//It could be null or empty
private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content;
Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();
if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || (null != standardJsonOnly && standardJsonOnly)) {
content = topicRouteData.encode(SerializerFeature.BrowserCompatible,
SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
SerializerFeature.MapSortField);
} else {
content = topicRouteData.encode();
}
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
Step1:调用 RouterlnfoManager 的方法,从路由 表 topicQueueTable、 brokerAddrTable、 filterServerTable中分别填充TopicRouteData中的List
Step2 : 如果找到主题对应的路由信息并且该主题为顺序消息,则从 NameServer KVconfig 中获取关于顺序消息相关 的配置填充路由信息 。
如果找不到路由信息CODE则使用 TOPIC_NOT_EXISTS ,表示没有找到对应的路由 。

一条小咸鱼