RocketMq-Broker-初始化

在使用rocketmq的时候,需要先启动NameServer和Broker,这里先分析Broker的初始化。

默认设置

Socket发送缓冲区128KB,Socket接收缓冲区128KB

启动过程

  • 命令行调用BrokerStartup#main
  • 构造BrokerController
    • 构造BrokerConfig,NettyServerConfig,NettyClientConfig,messageStoreConfig配置类。
    • 如果命令行中使用-c指定了配置文件,则用配置文件的值覆盖上述配置类。
    • 用配置类构造BrokerController
    • 执行BrokerController#initialize方法
    • 执行BrokerController#start方法

BrokerController#initialize

  • 加载topicConfigManagertopic信息,路径store/config/topics.json
  • 加载consumerOffsetManager消费进度,路径store/config/consumerOffset.json
  • 加载subscriptionGroupManager消费者订阅关系,路径store/config/subscriptionGroup.json
  • 加载messageStore本地消息。
    • 检查是否异常退出。正常启动后会创建临时文件store/abort,正常关闭会删除这个临时文件。应用启动会检查是否存在abort文件,如果有则说明上次是非正常关闭。
    • 加载scheduleMessageService定时消息信息,路径store/config/delayOffset.json。与延时消息有关,以后再说。
    • 加载commitlog文件,路径store/commitlog #1
    • 加载consumequeue文件,路径store/cosumequeue #2
    • 加载storeCheckpoint存储检查点,路径store/checkpoint
    • 加载indexService消息索引服务,路径sotre/index
    • 尝试恢复消息数据(consumequeue和commitlog实例) #3
  • 初始化通信层
  • 加载线程池
  • 注册请求处理器,定义了与producer,consumer,nameserver的交互命令码及其处理器。
  • 加载定时任务
    • 每天凌晨打印昨天put/get的消息数
    • 每5秒持久化消费进度
    • 每小时扫描数据被删除了的topic,offset记录也对应删除
    • master每分钟打印推送到slave的消息和现有消息的差额,slave每分钟执行全量同步master

BrokerController#start

  • 启动flushConsumeQueueService逻辑消息队列刷盘服务,默认每秒执行一次 #4
  • 启动flushCommitLogService消息刷盘服务,根据配置选择同步刷盘和异步刷盘 #4
  • 启动storeStatsService运行时数据统计服务
  • Master机器启动scheduleMessageService定时消息服务 #4
  • 启动reputMessageService从物理队列解析消息重新发送到逻辑队列 #5
  • 启动HAService,负责同步双写,异步复制功能 #6
  • 创建临时文件abort
  • 定时删除过期文件 #7

1 如何加载commitlog文件

  1. 加载请求实际是mapedFileQueue#load完成的
  2. 遍历store/commitlog文件夹下文件
  3. 将每一个文件映射成MapedFile对象,wrotePostioncommitPostion字段都初始化为文件尾(1G)

2 如何加载consumequeue文件

  1. 遍历store/consumequeue,每个文件夹名称是topicName
  2. 遍历每个topic(store/consumequeue/${topicname}/),每个文件夹名称是queueId
  3. 构造ConsumeQueue,将每个文件映射成MapedFile对象,wrotePostioncommitPostion字段都初始化为文件尾(6000000Byte)

3 如何尝试恢复消息

  1. 正常流程恢复消息队列

    1. 执行每个队列的consumequeue#recover
    2. 获取队列下的所有文件,从倒数第三个文件开始恢复。
    3. 依次获取每20个字节为一个存储单元(8byte commitlogoffset + 4byte size + 8byte tags hashcode),判断是否有效(offset>0 && size>0)。
      1. 有效:获取下一个存储单元,processOffset+=20。当前文件读完,继续读取下一个文件。
      2. 无效:就在当前文件停止。
    4. 设置MapedFile对象,wrotePostion = processOffset % 6000000,commitPostion = processOffset % 6000000,删除无效的文件。
  2. 正常流程恢复commitlog

    1. 执行commitLog#recoverNormally
    2. 获取队列下的所有文件,从倒数第三个文件开始恢复。
    3. 依次获取每一个消息的为一个存储单元,消息大小为前4个字节。判断是否有效(size>=0)
      1. 有效:获取下一个存储单元,processOffset+=size。当前文件读完,继续读取下一个文件。
      2. 无效:就在当前文件停止。
    4. 设置MapedFile对象,wrotePostion = processOffset % 1073741824,commitPostion = processOffset % 1073741824,删除无效的文件。
  3. 异常流程恢复commitlog

    1. 当异常退出,例如断电,崩溃等,执行异常流程的恢复。
    2. 按文件从新往旧的顺序,开始校验文件第一条数据是否有效,判断逻辑是magiccode和存储时间
    3. 从3.2中找到第一个有效的文件开始,执行消息校验,同2.3,2.4
    4. 将1中正常恢复的消息队列,根据物理Offset删除无效逻辑文件(即修正队列的wrotePostioncommitPostion
  4. 恢复每个队列的消息个数,存储在Map中,key:topicName+”-“+queueid,value:postionOffset/20

作者: wuzhaoyang(John)
出处: http://wuzhaoyang.me/
因为作者水平有限,无法保证每句话都是对的,但能保证不复制粘贴,每句话经过推敲。希望能表达自己对于技术的态度,做一名优秀的软件工程师。