RocketMq技术内幕笔记(一)

1 大纲

1.1 RocketMQ 原代码的目录结构

RocketMQ 核心目 录说明如下 。

  1. broker: broker模块(broker启动进程) 。
  1. client:消息客户端,包含消息生产者、消息消费者相关类。
  2. common:公共包。
  3. dev:开发者信息(非源代码)。
  4. distribution:部署实例文件夹(非源代码)。
  5. example: RocketMQ 示例代码 。
  6. filter:消息过滤相关基础类。
  7. filtersrv: 消息过滤服务器实现相关类(Filter启动进程)。
  8. logappender:日志实现相关类。
  9. namesrv : NameServer 实现相关类(Names巳rver启动进程) 。
  10. openmessaging: 消息开放标准,正在制定中 。
  11. remoting: 远程通信模块,基于 Netty。
  12. srvutil:服务器工具类。
  13. store:消息存储实现相关类 。
  14. style: checkstyle相关实现。
  15. test: 测试相关类。
  16. tools: 工具类,监控命令相关实现类。

1.2 RocketMQ 的设计理念和目标

1.2.1 设计理念

RocketMQ 设计基于主题的发布与 订阅 模式 , (Broker)、消息消费。

NameServer:实现元数据的管理(Topic路由信息等),因为 Topic 路由信息无须在集群之 间保持强一致,追求最终一致性,并且能容 忍分钟级的 不一致 。

高效的IO存储机制:RocketMQ追求消息发送的高吞吐量, RocketMQ 的消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引人内存 l映射机制,所 有主 题的消息存储基于顺序写,极大地提高了消息写性能, 同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。

1.2.2 设计能力

  1. 架构模式
    RocketMQ 与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括 :
    消息发送者、消息服务器(消息存储)、消息消费、路由发现 。

  2. 顺序消息
    所谓顺序消息,就是消息消费者按照消息达到消息存储服务器的顺序消费 。 RocketMQ 可以严格保证消息有序 。

  3. 消息过滤
    RocketMQ 消息过滤支持在服务端与消费端的消息过滤机制 。
    1 )消息在 Broker 端过滤。Broker只将消息消费者感兴趣的消息发送给消息消费者 。
    2 )消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从 Broker传输到消费端。

  4. 消息存储
    RocketMQ 追求消息存储的高性能,引人内存映射机制,所有主题的消息顺序存储在同一个文件中 。 同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制。

  5. 消息高可用性
    通常影响消息可靠性的有以下几种情况 。

  1. Broker正常关机。
  2. Broker异常 Crash。
  3. OS Crash。
  4. 机器断电,但 是 能立即恢复供电情况 。
  5. 机器无法开机(可能是 CPU、主板、 内存等关键设备损 坏)。
  6. 磁盘设备损坏。
    情况 1~4 的 RocketMQ 在同步刷盘机制下可以确保不丢失消息,在异步刷盘模式下,会丢失少量消息 。 情况 5-6 属于单点故障,一旦发生,该节点上的消息全 部丢失,如果开启了异步复制机制, RoketMQ 能保证只丢失少量消息。
  1. 消息到达 (消费)低延迟
    RocketMQ在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。

  2. 确保消息必须被消费一次
    RocketMQ 通过消息消费确认机制(ACK)来确保消息至少被消费一次,但由于ACK消息有可能丢失等其他原因,RocketMQ无法做到消息只被消费一次,有重复消费的可能。

  3. 回溯消息
    回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。 RocketMQ 支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。

  4. 消息堆积
    RocketMQ 消息存储使用磁盘文件 (内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。 RocketMQ消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留3天。

  5. 定时消息
    定 时消息 是指消息发送到 Broker 后, 不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。 如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故RocketMQ不支持任意精度的定时消息,而只支持特定延迟级别。

  6. 消息重试机制
    消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ支持消息重试机制。

2 RocketMQ路由中心NameServer

2.1 NameServer 架构设计

RocketMQ 物理部署图

Broker消息服务器在启动时向所有 NameServer注册,消息生产者(Producer)在发送消 息之前先从 NameServer获取Broker 服务器地址列表,然后根据负载算法从列表中选择一 台消息服务器进行消息发送。NameServer与每台 Broker 服务器保持长连接,并间隔30s检测Broker是否存活,如果检测到 Broker右机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者。

NameServer本身的高可用可通过部 署多台 NameServer服务器来实现,但彼此之间互不通信,也就是 NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消 息发送不会造成任何影响。

2.2 NameServer 启动流程

NameServer启动类 : org.apache.rocketmq.namesrv.NamesrvStartup。

  1. Step 1: 首先来解析配置文件,需要填充 NameServerConfig、NettyServerConfig属性值。
  public static void parseCommandlineAndConfigFile(String[] args) throws Exception {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return;
        }

        namesrvConfig = new NamesrvConfig();
        nettyServerConfig = new NettyServerConfig();
        nettyClientConfig = new NettyClientConfig();
        nettyServerConfig.setListenPort(9876);
        controllerConfig = new ControllerConfig();
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                MixAll.properties2Object(properties, nettyClientConfig);
                MixAll.properties2Object(properties, controllerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            MixAll.printObjectProperties(null, namesrvConfig);
            MixAll.printObjectProperties(null, nettyServerConfig);
            MixAll.printObjectProperties(null, nettyClientConfig);
            MixAll.printObjectProperties(null, controllerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
    }

创建 NameServerConfig ( NameServer业务参数)、NettyServer-Config ( NameServer网络参数),然后在解析启动时把指定的配置文件或启动命令中的选项 值,填充到 nameServerConfig,nettyServerConfig对象。

public class NamesrvConfig {
    //rocketmq 主目录,可以通过 -Drocketmq.home.dir=path或通过设置环境变量 ROCKETMQ_HOME 来配置 RocketMQ 的主目录 。
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));

    //NameServer存储 KV 配置属性 的持久化路径 
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

    //NameServer 默认配置文件路径
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;

    //是否支持顺序消息,默认是不支持
    private boolean orderMessageEnable = false;
    private boolean returnOrderTopicConfigToBroker = true;
}
public class NettyServerConfig implements Cloneable {

    //NameServer监昕端口,该值默认会被初始化为 9876
    private int listenPort = 0;

    //Netty业务线程池线程个数。
    private int serverWorkerThreads = 8;

    //Netty public任务线程池线程个数,Netty网络设计,
    //根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等 。
    //如果该业务类型(RequestCode)未注册线程池, 则由 public线程池执行。
    private int serverCallbackExecutorThreads = 0;

    //IO线程池线程个数,主要是 NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的解析请求包,然后转发到
    //各个业务线程池完成具体的业务操作,然后将结果再返回调用方 。
    private int serverSelectorThreads = 3;

    //send oneway 消息请求井发度
    private int serverOnewaySemaphoreValue = 256;

    //异步消息发送最大并发度
    private int serverAsyncSemaphoreValue = 64;

    //网络连接最大空闲时间,默认120s。 如果连接空闲时间超过该参数设置的值,连接将被关闭。
    private int serverChannelMaxIdleTimeSeconds = 120;

    //网络 socket发送缓存区大小, 默认 64k
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    //网络 socket接收缓存区大小 ,默认 64k
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
    private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
    private int serverSocketBacklog = NettySystemConfig.socketBacklog;
    //ByteBuffer是否开启缓存 , 建议开启
    private boolean serverPooledByteBufAllocatorEnable = true;

    /**
     * make install
     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
     * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
     */
    //是否启用EpollIO模型, Linux环境建议开启。
    private boolean useEpollNativeSelector = false;
}

️启动 NameServer时,可以先使用/mqnameserver-c configFile -p 打印当前加载的配置属性

  1. Step2:根据启动属性创建 NamesrvController实例,并初始化该实例,实例为NameServer核心控制器。
  public boolean initialize() {
        //加载kvConfigPath下kvConfig.json配置文件里的KV配置,然后将这些配置放到KVConfigManager#configTable属性中
        loadConfig();
        //根据nettyServerConfig初始化一个netty服务器。
        initiateNetworkComponents();
        //初始化负责处理Netty网络交互数据的线程池,默认线程数是16个
        initiateThreadExecutors();
        ////注册Netty服务端业务处理逻辑,如果开启了clusterTest,那么注册的请求处理类是ClusterTestRequestProcessor,否则请求处理类是DefaultRequestProcessor
        registerProcessor();
        //注册心跳机制线程池,延迟5毫秒启动,每隔5秒遍历RouteInfoManager#brokerLiveTable这个属性,用来扫描不存活的broker
        //注册打印KV配置线程池,延迟1分钟启动、每10分钟打印出kvConfig配置
        startScheduleService();
        initiateSslContext();
        initiateRpcHooks();
        return true;
    }
  1. Step3 :注册JVM钩子函数并启动服务器,以便监昕 Broker、消息生产者的网络请求 。
//在 JVM 进程关闭之前,先将线程池关闭,及时释放资源 。
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
            controller.shutdown();
            return null;
        }));

        controller.start();

2.3 NameServer 路由注册、故障剔除

NameServer主要作用是为消息生产者和消息消费者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基础信息,还要能够管理Broker节点,包括路由注册、 路由删除等功能。

2.3.1 路由元信息

NameServer路由实现类: org.apache.rocketmq.namesrv.routeinfo.RoutelnfoManager

RoutelnfoManage 路由元数据

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    //Topic 消息队列路由信息,消息发送时根据路由表进行负 载均衡 。
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;

    //Broker 基础信息, 包含 brokerName、 所属集群名称 、 主备 Broker地址。
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;

    //Broker 集群信息,存储集群中所有 Broker 名称 。
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //Broker 状态信息 。 NameServer 每次 收到心跳包时会 替换该信 息
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // Broker上的 FilterServer列表,用于类模式消息过滤
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;

    private final BatchUnregistrationService unRegisterService;

    private final NamesrvController namesrvController;
    private final NamesrvConfig namesrvConfig;
}

RocketMQ基于订阅发布机制,一个Topic拥有多个消息队列 ,一个Broker为每一主题默认创建4个读队列4个写队列。多个Broker组成一个集群,BrokerName由相同的多台Broker组成Master-Slave架构 , brokerId为0代表 Master,大于0表示Slave。 BrokerLivelnfo中的lastUpdateTimestamp 存储上次收到Broker心跳包的时间。

2.3.2 路由注册

RocketMQ路由注册是通过 Broker与NameServer的心跳功能实现的。Broker启动时 向 集群中 所有的 NameServer发送心跳语句,每隔30s向 集群 中所 有 NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLivelnfo的lastUpdateTimestamp,然后NameServer每隔10s扫描 brokerLiveTable,如果连续120s没有收到心跳包, NameServer将移除该 Broker的路由信息同时关闭Socket连接。

  1. Broker发送心跳包

Broker端心跳包发送

  scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
            @Override
            public void run2() {
                try {
                    if (System.currentTimeMillis() < shouldStartTime) {
                        BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
                        return;
                    }
                    if (isIsolated) {
                        BrokerController.LOG.info("Skip register for broker is isolated");
                        return;
                    }
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    BrokerController.LOG.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

registerBrokerAll

   final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();

final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
    for (final String namesrvAddr : nameServerAddressList) {
        brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
            @Override
            public void run2() {
                try {
                    RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                    if (result != null) {
                        registerBrokerResultList.add(result);
                    }

                    LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
                } catch (Exception e) {
                    LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
                } finally {
                    countDownLatch.countDown();
                }
            }
        });
    }

        try {
        if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
            LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
        }
    } catch (InterruptedException ignore) {
    }

该方法主要是遍历 NameServer列表,Broker消息服务器依次向 NameServer发送心跳包。

  private RegisterBrokerResult registerBroker(
            final String namesrvAddr,
            final boolean oneway,
            final int timeoutMills,
            final RegisterBrokerRequestHeader requestHeader,
            final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
            InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);

        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }
        //....
    }

发送心跳包具体逻辑,首先封装请求包头( Header)。

brokerAddr: broker 地址 。
brokerId: brokerld,0:Master;大 0: Slave。
brokerName:broker名称。
clusterName: 集群名称。
haServerAddr: master 地址,初次请求时该值为空,slave 向Nameserver注册后返回。
requestBody:
filterServerList:消息过滤服务器列表。
topicConfigWrapper:主题配置。

    final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    requestHeader.setHaServerAddr(haServerAddr);
    requestHeader.setEnableActingMaster(enableActingMaster);
    requestHeader.setCompressed(false);

    RegisterBrokerBody requestBody = new RegisterBrokerBody();
    requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
    requestBody.setFilterServerList(filterServerList);

RocketMQ网络传输基于 Netty,每一个请求,RocketMQ都会定义一个RequestCode,然后在服务端会对应相应的网络处理器 (processor包中)。

  1. NameServer处理心跳包

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 网络处理器解析请求类型, 如果请求类型为RequestCode.REGISTER_BROKER,则请求最终转发到RoutelnfoMan ager#registerBroker。

this.lock.writeLock().lockInterruptibly();

//init or update the cluster info
Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
  • Step1:路由注册需要加写锁,防止并发修改RoutelnfoManager中的路由表。
// 是否第一个注册
boolean registerFirst = false; 
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
    registerFirst = true;
    brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
    this.brokerAddrTable.put(brokerName, brokerData);
}

boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
brokerData.setZoneName(zoneName);

//省略

String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));

Step2 :维护BrokerData信息,首先从brokerAddrTable根据 BrokerName尝试获取 Broker信息,如果不存在,则新建BrokerData并放入到brokerAddrTable, registerFirst设置为 true;如果存在,直接替换原先的,registerFirst设置为false,表示非第一次注册。

boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
        && brokerId == Collections.min(brokerAddrsMap.keySet());
if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {

        ConcurrentMap<String, TopicConfig> tcTable =
                topicConfigWrapper.getTopicConfigTable();
        if (tcTable != null) {
            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
                        topicConfigWrapper.getDataVersion(), brokerName,
                        entry.getValue().getTopicName())) {
                    final TopicConfig topicConfig = entry.getValue();
                    if (isPrimeSlave) {
                        // Wipe write perm for prime slave
                        topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
                    }
                    this.createAndUpdateQueueData(brokerName, topicConfig);
                }
            }
        }
}
  • Step3 :如果Broker为Master,并且BrokerTopic配置信息发生变化或者是初次注册,则需要创建或更新 Topic路由元数据,填充topicQueueTable,其实就是为默认主题自动注 册路由信息其中包含 MixAII.DEFAULT_TOPIC的路由信息。当消息生产者发送主题时,如果该主题未创建并且BrokerConfig的autoCreateTopicEnable为true时,将返回MixAII. DEFAULT_TOPIC的路由信息。
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());

    Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName());
    if (null == queueDataMap) {
        queueDataMap = new HashMap<>();
        queueDataMap.put(brokerName, queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap);
        log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
    } else {
        final QueueData existedQD = queueDataMap.get(brokerName);
        if (existedQD == null) {
            queueDataMap.put(brokerName, queueData);
        } else if (!existedQD.equals(queueData)) {
            log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), existedQD,
                queueData);
            queueDataMap.put(brokerName, queueData);
        }
    }
}

根据 TopicConfig创建 QueueData数据结构 ,然后更新 topicQueueTable。

BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
        new BrokerLiveInfo(
                System.currentTimeMillis(),
                timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
                topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
                channel,
                haServerAddr));
if (null == prevBrokerLiveInfo) {
    log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
}
  • Step4: 更新BrokerLivelnfo,存活Broker信息表, BrokeLivelnfo是执行路由删除的重要依据。
if (filterServerList != null) {
    if (filterServerList.isEmpty()) {
        this.filterServerTable.remove(brokerAddrInfo);
    } else {
        this.filterServerTable.put(brokerAddrInfo, filterServerList);
    }
}

if (MixAll.MASTER_ID != brokerId) {
    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
    if (masterAddr != null) {
        BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
        BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
        if (masterLiveInfo != null) {
            result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
            result.setMasterAddr(masterAddr);
        }
    }
}
  • Step5 : 注册Broker的过滤器Server地址列表,一个Broker上会关联多个FilterServer消息过滤服务器;如果此Broker为从节点,则需要查找该Broker的Master 的节点信息,并更新对应的masterAddr属性 。

NameServe与Broker保持长连接, Broker状态存储在 brokerLiveTable中,NameServer每收到一个心跳包,将更新 brokerLiveTable中关于Broker的状态信息以及路 由表( topicQueueTable、 brokerAddrTable、brokerLiveTable、filterServerTable)。 更新上述 路由表( HashTable)使用了锁粒度较少的读写锁,允许多个消息发送者( Producer)并发读,保证消息发送时的高并发。但同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。

2.3.3 路由删除

NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过 120s,则认为Broker失效,移除该 Broker, 关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

RocktMQ有两个触发点来触发路由删除。

  1. NameServer定时扫描 brokerLiveTable检测上次心跳包与当前系统时间的时间差,如果时间戳大于 120s则需要移除该 Broker信息 。
    2 ) Broker在正常被关闭的情况下会执行unregisterBroker指令。
public void scanNotActiveBroker() {
    try {
        log.info("start scanNotActiveBroker");
        for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {
            long last = next.getValue().getLastUpdateTimestamp();
            long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
            if ((last + timeoutMillis) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
                this.onChannelDestroy(next.getKey());
            }
        }
    } catch (Exception e) {
        log.error("scanNotActiveBroker exception", e);
    }
}
public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {
    UnRegisterBrokerRequestHeader unRegisterRequest = new UnRegisterBrokerRequestHeader();
    boolean needUnRegister = false;
    if (brokerAddrInfo != null) {
        try {
            try {
                this.lock.readLock().lockInterruptibly();
                needUnRegister = setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }

    if (needUnRegister) {
        boolean result = this.submitUnRegisterBrokerRequest(unRegisterRequest);
        log.info("the broker's channel destroyed, submit the unregister request at once, " +
            "broker info: {}, submit result: {}", unRegisterRequest, result);
    }
}

Step1 :申请写锁,把需要移除的broker添加到阻塞队列。

public void unRegisterBroker(Set<UnRegisterBrokerRequestHeader> unRegisterRequests) {
        try {
            try {
                Set<String> removedBroker = new HashSet<>();
                Set<String> reducedBroker = new HashSet<>();
                Map<String, BrokerStatusChangeInfo> needNotifyBrokerMap = new HashMap<>();

                this.lock.writeLock().lockInterruptibly();
                for (final UnRegisterBrokerRequestHeader unRegisterRequest : unRegisterRequests) {
                    final String brokerName = unRegisterRequest.getBrokerName();
                    final String clusterName = unRegisterRequest.getClusterName();

                    BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, unRegisterRequest.getBrokerAddr());

                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddrInfo);
                    log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
                        brokerLiveInfo != null ? "OK" : "Failed",
                        brokerAddrInfo
                    );

                    this.filterServerTable.remove(brokerAddrInfo);

                    boolean removeBrokerName = false;
                    boolean isMinBrokerIdChanged = false;
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null != brokerData) {
                        if (!brokerData.getBrokerAddrs().isEmpty() &&
                            unRegisterRequest.getBrokerId().equals(Collections.min(brokerData.getBrokerAddrs().keySet()))) {
                            isMinBrokerIdChanged = true;
                        }
                        String addr = brokerData.getBrokerAddrs().remove(unRegisterRequest.getBrokerId());
                        log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
                            addr != null ? "OK" : "Failed",
                            brokerAddrInfo
                        );
                        if (brokerData.getBrokerAddrs().isEmpty()) {
                            this.brokerAddrTable.remove(brokerName);
                            log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                                brokerName
                            );

                            removeBrokerName = true;
                        } else if (isMinBrokerIdChanged) {
                            needNotifyBrokerMap.put(brokerName, new BrokerStatusChangeInfo(
                                brokerData.getBrokerAddrs(), addr, null));
                        }
                    }

                    if (removeBrokerName) {
                        Set<String> nameSet = this.clusterAddrTable.get(clusterName);
                        if (nameSet != null) {
                            boolean removed = nameSet.remove(brokerName);
                            log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                                removed ? "OK" : "Failed",
                                brokerName);

                            if (nameSet.isEmpty()) {
                                this.clusterAddrTable.remove(clusterName);
                                log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                                    clusterName
                                );
                            }
                        }
                        removedBroker.add(brokerName);
                    } else {
                        reducedBroker.add(brokerName);
                    }
                }

                cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);

                if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {
                    notifyMinBrokerIdChanged(needNotifyBrokerMap);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("unregisterBroker Exception", e);
        }
    }

Step2 :统一删除每个map中元数据。

private void cleanTopicByUnRegisterRequests(Set<String> removedBroker, Set<String> reducedBroker) {
    Iterator<Entry<String, Map<String, QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
    while (itMap.hasNext()) {
        Entry<String, Map<String, QueueData>> entry = itMap.next();

        String topic = entry.getKey();
        Map<String, QueueData> queueDataMap = entry.getValue();

        for (final String brokerName : removedBroker) {
            final QueueData removedQD = queueDataMap.remove(brokerName);
            if (removedQD != null) {
                log.debug("removeTopicByBrokerName, remove one broker's topic {} {}", topic, removedQD);
            }
        }

        if (queueDataMap.isEmpty()) {
            log.debug("removeTopicByBrokerName, remove the topic all queue {}", topic);
            itMap.remove();
        }

        for (final String brokerName : reducedBroker) {
            final QueueData queueData = queueDataMap.get(brokerName);

            if (queueData != null) {
                if (this.brokerAddrTable.get(brokerName).isEnableActingMaster()) {
                    // Master has been unregistered, wipe the write perm
                    if (isNoMasterExists(brokerName)) {
                        queueData.setPerm(queueData.getPerm() & (~PermName.PERM_WRITE));
                    }
                }
            }
        }
    }
}

Step3 :删除Topic队列中元数据。

2.3.4 路由发现

RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。根据主题名称拉取路由信息的命令编码为: GET_ROUTEINTO_BY_TOPIC。

public class TopicRouteData extends RemotingSerializable {

    //顺序消息配置内容,来自于 kvConfig。
    private String orderTopicConf;
    //topic 队列元数据 
    private List<QueueData> queueDatas;
    //topic分布的 broker元数据
    private List<BrokerData> brokerDatas;
    //broker上过滤服务器地址列表。
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    //It could be null or empty
    private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

        if (topicRouteData != null) {
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                String orderTopicConf =
                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                        requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }

            byte[] content;
            Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();
            if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || (null != standardJsonOnly && standardJsonOnly)) {
                content = topicRouteData.encode(SerializerFeature.BrowserCompatible,
                    SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
                    SerializerFeature.MapSortField);
            } else {
                content = topicRouteData.encode();
            }

            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

Step1:调用 RouterlnfoManager 的方法,从路由 表 topicQueueTable、 brokerAddrTable、 filterServerTable中分别填充TopicRouteData中的List、List和 filterServer 地址表 。
Step2 : 如果找到主题对应的路由信息并且该主题为顺序消息,则从 NameServer KVconfig 中获取关于顺序消息相关 的配置填充路由信息 。

如果找不到路由信息CODE则使用 TOPIC_NOT_EXISTS ,表示没有找到对应的路由 。

NameServer 路由 注册、删除机制

一条小咸鱼