通用技术 初学 RocketMQ 之消费乱序

Hugo · 2022年05月29日 · 最后由 Edmond 回复于 2022年05月30日 · 5610 次阅读

最近在测试 MQ 相关的业务,刚好也在学习中间件相关知识。所以今天分享一些关于 MQ 消费乱序和解决乱序的基本思路(RocketMQ)。可能不对,欢迎指正。

消费乱序

要解决消费乱序,我觉得首先要了解 MQ 基本投递消费方式和对应的业务场景。

所有场景都要解决乱序吗?
我认为并不是所有场景都需要管顺序的。比如发送一封邮件,我同时发 10 个邮件,我会关心它哪个先收到,哪个后收到吗?至少我觉得我不会。
什么场景需要顺序
比如我下单买一个商品,操作步骤可以为:下单 - 付款 - 发货。那这个时候可以是 “下单 - 发货 - 付款 “的顺序吗?如果可以,请告诉我在哪里,我马上去下单。

为什么会乱序

Topic
Topic 的作用就是为了区分消息种类,把业务属性相同的消息投递到同一个主题中,消费者订阅这个主题,就可以得到对应的消息,通常 Topic 的默认 Queue 的读写队列为 4,也就是说生产者会往这 4 个 Queue 不断的投递消息。
Producer -> Queue
Queue 也就是放消息的管道。Producer 把数据丢到 Queue 中,会有选择算法。

  • 轮询算法:默认选择算法,这个算法保证了每个 Queue 的消息相对平均。
  • 最小投递延迟算法:该算法会根据每次投递的延迟时间,选择延迟最小的 Queue 进行投递,如果延迟相同,则采用轮询算法投递。

乱序的产生和解决
上面我们知道了投递的策略,那其实很容易理解了。MQ 才不会管你的消息顺序是怎样的,它只负责消息按照算法投递到对应的 Queue 中,那结果就是,消费者在消息时,有可能先读到后面的消息,导致业务逻辑乱序。既然我们知道了是因为消息投递到不同 Queue 导致的,那么我只要按顺序把它放到一个 Queue 中,那不就行了吗?下面上伪代码。

生产者
此时,先按默认选择的方式投递消息

public class SyncSender {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 普通消息发送
    public void ordinarySend(){

        String num = String.valueOf(abs);
        String msg = "下单";
        String msg1 = "付款";
        String msg2 = "发货";
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
                return mqs.get(Integer.parseInt(String.valueOf(arg)));
            }
        });

        org.springframework.messaging.Message<String> keys = MessageBuilder.withPayload(msg + "-" +key).setHeader("KEYS", key).build();
        org.springframework.messaging.Message<String> keys1 = MessageBuilder.withPayload(msg1 + "-" + key).setHeader("KEYS", key).build();
        org.springframework.messaging.Message<String> keys2 = MessageBuilder.withPayload(msg2 + "-" + key).setHeader("KEYS", key).build();

        SendResult sendResult = rocketMQTemplate.syncSend(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys);
        SendResult sendResult1 = rocketMQTemplate.syncSend(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys1);
        SendResult sendResult2 = rocketMQTemplate.syncSend(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys2);

        System.out.println("queue:"+sendResult.getMessageQueue() + ":" + "key" + key);
        System.out.println("queue:"+sendResult1.getMessageQueue() + ":" +  "key" + key);
        System.out.println("queue:"+sendResult2.getMessageQueue() + ":" +  "key" + key);
    }
};

我们看看最终的结果
发货- 下单 -付款?乱七八糟

换一种做法,我们用订单 ID 进行 HASH 简单的取模,把相同的订单 ID 都丢到同一个 Queue 中。

  • order 订单之间 id 不是连续,说明在 Queue 中,并不是连续的
  • 虽然不是连续,但是在同一个队列中,所以只需要保证最终有序性即可
  • 这种方式校验,不严谨。在真实的业务中,需要加上其他逻辑校验是否真的有序。比如每个订单放入到独立的缓存空间排序,校验最终的一致性
public class SyncSender {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 普通消息发送
    public void ordinarySend(){
        Date date = new Date();
        UUID uuid = UUID.randomUUID();
        String uid = String.valueOf(uuid).replace("-","") + date;
        int code = uid.hashCode();
        int key = Math.abs(code);
        int abs = Math.abs(code % 4);
        if (abs ==0){
            abs++;
        }
        String num = String.valueOf(abs);
        String msg = "下单";
        String msg1 = "付款";
        String msg2 = "发货";
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
                return mqs.get(Integer.parseInt(String.valueOf(arg)));
            }
        });

        org.springframework.messaging.Message<String> keys = MessageBuilder.withPayload(msg + "-" +key).setHeader("KEYS", key).build();
        org.springframework.messaging.Message<String> keys1 = MessageBuilder.withPayload(msg1 + "-" + key).setHeader("KEYS", key).build();
        org.springframework.messaging.Message<String> keys2 = MessageBuilder.withPayload(msg2 + "-" + key).setHeader("KEYS", key).build();
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys, num);
        SendResult sendResult1 = rocketMQTemplate.syncSendOrderly(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys1, num);
        SendResult sendResult2 = rocketMQTemplate.syncSendOrderly(MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, keys2, num);

        System.out.println("queue:"+sendResult.getMessageQueue() + ":" + "key" + key);
        System.out.println("queue:"+sendResult1.getMessageQueue() + ":" +  "key" + key);
        System.out.println("queue:"+sendResult2.getMessageQueue() + ":" +  "key" + key);
    }
};




共收到 1 条回复 时间 点赞

介绍得很详细啊,期待楼主持续更新

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册