最近在测试 MQ 相关的业务,刚好也在学习中间件相关知识。所以今天分享一些关于 MQ 消费乱序和解决乱序的基本思路(RocketMQ)。可能不对,欢迎指正。
要解决消费乱序,我觉得首先要了解 MQ 基本投递消费方式和对应的业务场景。
所有场景都要解决乱序吗?
我认为并不是所有场景都需要管顺序的。比如发送一封邮件,我同时发 10 个邮件,我会关心它哪个先收到,哪个后收到吗?至少我觉得我不会。
什么场景需要顺序
比如我下单买一个商品,操作步骤可以为:下单 - 付款 - 发货。那这个时候可以是 “下单 - 发货 - 付款 “的顺序吗?如果可以,请告诉我在哪里,我马上去下单。
Topic
Topic 的作用就是为了区分消息种类,把业务属性相同的消息投递到同一个主题中,消费者订阅这个主题,就可以得到对应的消息,通常 Topic 的默认 Queue 的读写队列为 4,也就是说生产者会往这 4 个 Queue 不断的投递消息。
Producer -> Queue
Queue 也就是放消息的管道。Producer 把数据丢到 Queue 中,会有选择算法。
乱序的产生和解决
上面我们知道了投递的策略,那其实很容易理解了。MQ 才不会管你的消息顺序是怎样的,它只负责消息按照算法投递到对应的 Queue 中,那结果就是,消费者在消息时,有可能先读到后面的消息,导致业务逻辑乱序。既然我们知道了是因为消息投递到不同 Queue 导致的,那么我只要按顺序把它放到一个 Queue 中,那不就行了吗?下面上伪代码。
生产者
此时,先按默认选择的方式投递消息
public class SyncSender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 普通消息发送
public void ordinarySend(){
String num = String.valueOf(abs);
String msg = "下单";
String msg1 = "付款";
String msg2 = "发货";
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
return mqs.get(Integer.parseInt(String.valueOf(arg)));
}
});
org.springframework.messaging.Message<String> keys = MessageBuilder.withPayload(msg + "-" +key).setHeader("KEYS", key).build();
org.springframework.messaging.Message<String> keys1 = MessageBuilder.withPayload(msg1 + "-" + key).setHeader("KEYS", key).build();
org.springframework.messaging.Message<String> keys2 = MessageBuilder.withPayload(msg2 + "-" + key).setHeader("KEYS", key).build();
SendResult sendResult = rocketMQTemplate.syncSend(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys);
SendResult sendResult1 = rocketMQTemplate.syncSend(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys1);
SendResult sendResult2 = rocketMQTemplate.syncSend(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys2);
System.out.println("queue:"+sendResult.getMessageQueue() + ":" + "key" + key);
System.out.println("queue:"+sendResult1.getMessageQueue() + ":" + "key" + key);
System.out.println("queue:"+sendResult2.getMessageQueue() + ":" + "key" + key);
}
};
我们看看最终的结果
发货- 下单 -付款?乱七八糟
换一种做法,我们用订单 ID 进行 HASH 简单的取模,把相同的订单 ID 都丢到同一个 Queue 中。
public class SyncSender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 普通消息发送
public void ordinarySend(){
Date date = new Date();
UUID uuid = UUID.randomUUID();
String uid = String.valueOf(uuid).replace("-","") + date;
int code = uid.hashCode();
int key = Math.abs(code);
int abs = Math.abs(code % 4);
if (abs ==0){
abs++;
}
String num = String.valueOf(abs);
String msg = "下单";
String msg1 = "付款";
String msg2 = "发货";
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
return mqs.get(Integer.parseInt(String.valueOf(arg)));
}
});
org.springframework.messaging.Message<String> keys = MessageBuilder.withPayload(msg + "-" +key).setHeader("KEYS", key).build();
org.springframework.messaging.Message<String> keys1 = MessageBuilder.withPayload(msg1 + "-" + key).setHeader("KEYS", key).build();
org.springframework.messaging.Message<String> keys2 = MessageBuilder.withPayload(msg2 + "-" + key).setHeader("KEYS", key).build();
SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys, num);
SendResult sendResult1 = rocketMQTemplate.syncSendOrderly(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys1, num);
SendResult sendResult2 = rocketMQTemplate.syncSendOrderly(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys2, num);
System.out.println("queue:"+sendResult.getMessageQueue() + ":" + "key" + key);
System.out.println("queue:"+sendResult1.getMessageQueue() + ":" + "key" + key);
System.out.println("queue:"+sendResult2.getMessageQueue() + ":" + "key" + key);
}
};