RocketMq-Producer-初始化

初始化

默认设置

每个topic4条队列,3000ms超时,body超过4kb需要压缩,2次失败重试,body最大不超过128kb

启动过程

  • DefaultMQProducer#start将请求转发给DefaultMQProducerImpl#start
  • 如果producerGroup不是CLIENT_INNER_PRODUCER,并且instanceNamedefault,则将instanceName设置为pid
  • 构造MQClientInstanceMQClientManager是单例,管理clientIdmQClientInstance的一对一映射关系,其中clientIdip@instanceName
  • 调用MQClientInstance#registerProducer注册producer
  • 调用MQClientInstance#start 启动
    • 如果没有设置namesrv,调用mQClientAPIImpl#fetchNameServerAddr获取命名服务器地址
    • 调用mQClientAPIImpl#start启用channel通道
    • 启动定时任务
    • 调用pullMessageService#start()启动拉消息服务(consumer逻辑,以后分析)
    • 调用rebalanceService#start()启动负载均衡服务(consumer逻辑)
    • 调用this.defaultMQProducer#getDefaultMQProducerImpl()#start启动发送消息服务(consumer逻辑)

定时任务

  • 如果namesrv没有配置,定时从某一固定地址获取namesrv配置
  • 定时从namesrv更新topic路由信息
  • 定时去掉离线的broker,并向所有master broker发送心跳
  • 定时持久化消费进度(consumer逻辑)
  • 定时调整线程池(consumer逻辑)

clientId可以看出,一个jvm实例的所有producerconsumer共用一个MQClientInstance实例。所以MQClientInstance#start方法既包含生产者又包含消费者逻辑。同一台机器上可以通过启动多个jvm来模拟集群,提高吞吐。但是需要设置不同的instanceName,方法如下有两种:

  • 指定参数-Drocketmq.client.name
  • 调用start之前,setInstanceName

如果不设置,那么会被视为同一个client,无法提高吞吐。

作者: wuzhaoyang(John)
出处: http://wuzhaoyang.me/
因为作者水平有限,无法保证每句话都是对的,但能保证不复制粘贴,每句话经过推敲。希望能表达自己对于技术的态度,做一名优秀的软件工程师。