RocketMq-Producer-发送消息

初始化完成后就可以发送消息了。消息的通信方式分为

  • 同步
  • 异步
  • oneway

支持的特性包括

  • 普通消息
  • 事务消息
  • 顺序消息
  • 延时消息

关键数据结构

数据结构

普通消息

发送过程

  • DefaultMQProducer#send将请求转发给defaultMQProducerImpl.send
  • 校验producer已处于running状态
  • 校验消息:topic不能为空,不能存在特殊字符,不能超过255个字符;body不能为空,不能超过128kb等
  • 调用tryToFindTopicPublishInfo,首次发送没有TopicPublishInfo,从命名服务器获取。此后由定时任务触发更新
  • 发送过程中,时间不超过4s,次数不超过3次
  • 选择一个messageQueue。取余从List<messageQueue>中选择一条;跳过broker是失败的messageQueue

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName != null) {
    int index = this.sendWhichQueue.getAndIncrement();
    for (int i = 0; i < this.messageQueueList.size(); i++) {
    int pos = Math.abs(index++) % this.messageQueueList.size();
    MessageQueue mq = this.messageQueueList.get(pos);
    if (!mq.getBrokerName().equals(lastBrokerName)) {
    return mq;
    }
    }
    return null;
    }
    else {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    return this.messageQueueList.get(pos);
    }
    }
  • 根据messageQueuebrokername字段,从brokerAddrTable中获取集群中master机器地址

  • 构造SendMessageRequestHeader,通过channelmaster broker发送请求

事务消息

发送过程

  • TransactionMQProducer#sendMessageInTransaction将请求转发给DefaultMQProducerImpl#sendMessageInTransaction
  • Broker发送一条Prepared消息,过程同普通消息的发送
  • 回调本地事务
  • 提交或者回滚Broker端消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//关键代码
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
// 有效性检查
...
// 第一步,向Broker发送一条Prepared消息
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
sendResult = this.send(msg)
// 第二步,回调本地事务
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
// 第三步,提交或者回滚Broker端消息
this.endTransaction(sendResult, localTransactionState, localException);
...
return transactionSendResult;
}
private void endTransaction(//
final SendResult sendResult, //
final LocalTransactionState localTransactionState, //
final Throwable localException) {
...
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionCommitType);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionRollbackType);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionNotType);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
...
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(addr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

MessageSysFlag

5 4 3 2 1
事物rollback消息 事物Commit消息(事物rollback消息) 事物Prepared消息 单条消息多tag 压缩

当第四位和第五位都是1,表示是rollback消息。消息类型 = flag & TransactionRollbackType(011000)

顺序消息

顺序消息指的是局部顺序,发送者将某一类型的消息发送到同一条队列,使消息在队列中有序。有这种需求的消息,在发送的时候可以指定MessageQueueSelector的实现。一般来说将key值取余是一种方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (int i = 0; i < 100; i++) {
// 订单ID相同的消息要有序
int orderId = i % 10;
Message msg =
new Message("TopicTestjjj", ("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
}

如何保证全局顺序?

如果严格要求保证全局有序,必须保证一个topic仅有一个messageQueue。否则在broker扩容后,原本发送到A队列的消息可能会发送到B队列,当B队列中的消息先被消费,就会出现顺序错乱。

延时消息

如果有15分钟关单之类的需求,可以使用延时消息特性。生产者只能指定延时级别,级别对应的具体时间由broker定义,默认是messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",可以修改但不建议修改。

1
2
3
4
5
Message msg = new Message("TopicTestx2",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes());
msg.setDelayTimeLevel(2);//5s
SendResult sendResult = producer.send(msg);

tryToFindTopicPublishInfo

该方法获取指定topicMessageQueue列表。当初始化的时候或者找不到路由信息,需要从nameserver拉取路由信息。topic的消息队列保存在Map中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable =
new ConcurrentHashMap<String, TopicRouteData>();
public class TopicRouteData {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable =
new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable =
new ConcurrentHashMap<String, MQConsumerInner>();
  • 调用mQClientFactory#updateTopicRouteInfoFromNameServer
    • 调用MQClientAPIImpl#getTopicRouteInfoFromNameServer获取TopicRouteData
    • 如果本地TopicRouteData跟命名服务器上不同,设置更新标记
    • 遍历生产者producerTable和消费者consumerTable,校验每一个生产者的topicPublishInfoTable中,该topic的消息队列列表是否存在。校验每一个消费者的topicSubscribeInfoTable中,订阅该topic的消息队列是否存在。
    • 如果需要更新,更新本地的topicRouteTable,topicPublishInfoTable,topicSubscribeInfoTable。更新topicPublishInfoTable时用master brokerwrite queue number。更新topicSubscribeInfoTable时用所有的brokerread queue number

消息的构造和发送

client端关注的是Message实体的构造,参照上图数据结构,topic,flag,body都有字段直接保存,其余字段都是通过Hash表保存,也就是properties字段,例如tags等。producer可使用的key值枚举如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用)
*/
public static final String PROPERTY_KEYS = "KEYS";
/**
* 消息标签,只支持设置一个Tag(服务端消息过滤使用)
*/
public static final String PROPERTY_TAGS = "TAGS";
/**
* 是否等待服务器将消息存储完毕再返回(可能是等待刷盘完成或者等待同步复制到其他服务器)
*/
public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
/**
* 消息延时投递时间级别,0表示不延时,大于0表示特定延时级别(具体级别在服务器端定义)
*/
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";

在发送的时候,将信息存储在SendMessageRequestHeader请求头中

1
2
3
4
5
6
7
8
9
10
11
12
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());

为了能进一步压缩数据包,可以将SendMessageRequestHeader转化成SendMessageRequestHeaderV2,通过将字段名称设置为简单的字母降低网络传输。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
@CFNotNull
private String a;// producerGroup;
@CFNotNull
private String b;// topic;
@CFNotNull
private String c;// defaultTopic;
@CFNotNull
private Integer d;// defaultTopicQueueNums;
@CFNotNull
private Integer e;// queueId;
@CFNotNull
private Integer f;// sysFlag;
@CFNotNull
private Long g;// bornTimestamp;
@CFNotNull
private Integer h;// flag;
@CFNullable
private String i;// properties;
@CFNullable
private Integer j;// reconsumeTimes;
@CFNullable
private boolean k;// unitMode = false;
}

最终封装成RemotingCommand通过Netty发送。

作者: wuzhaoyang(John)
出处: http://wuzhaoyang.me/
因为作者水平有限,无法保证每句话都是对的,但能保证不复制粘贴,每句话经过推敲。希望能表达自己对于技术的态度,做一名优秀的软件工程师。