上次我们简单的模拟了消费乱序的问题,今天我们聊聊在 RokcetMQ 中,关于消息堆积的问题
接下来我们聊聊消息堆积
字面意思:堆积,就是把事物堆积成堆。这里指的就是消息堆积在一起,一直没有被消费或消费的很慢。
消息一般会存在 Broker 服务里面。这里拿我自己搭建的环境,我们来看看(因为我没做好文件映射关系,所以直接进去容器看),一般消息都会放在/${ROCKETMQ_HOME}$/store/里面,实体消息放在 commitLog 文件,consumequeue 是存放消息索引的。这个涉及到消息索引和持久化部分就不具体说明。
先说结论,首先消息的生命周期简单来说是 “生产 - 消费” 这样的过程。一般来说生产者不会是消息堆积的诱因(感觉不一定,可以试验一下)。产生消堆积的原因一般是消费速度赶不上生产速度所引起的,可能主要有以下两种类型的代码(希望有更多小伙伴补充一下真实的场景):
首先我们要有 “生产者” 和 “消费者” 的角色,这里用的是 spring-boot-starter 快速搭建环境和模拟的
生产者
@Component
public class ReTrySender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 普通消息发送
public void delayOrdinarySend() {
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
DefaultMQProducer producer = rocketMQTemplate.getProducer();
producer.setRetryTimesWhenSendFailed(3);
String uuid = UUID.randomUUID().toString().replace("-", "");
String msg = "order" + uuid;
String key = "test";
Message<String> buildMsg = MessageBuilder.withPayload(msg).setHeader("KEYS", key).build();
SendResult sendResult = rocketMQTemplate.syncSend(
MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, buildMsg);
System.out.println("生产消息:"+ msg + ":" + dateString);
}
}
消费者
@Component
@RocketMQMessageListener(topic = MqUntilSecond.tag_topic,
consumerGroup = MqUntilSecond.consumerGroup,
messageModel = MessageModel.CLUSTERING,
consumeThreadMax = MqUntilSecond.maxThread,
consumeMode = ConsumeMode.CONCURRENTLY)
public class ConsumerTag1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String message) {
int i = 1;
while (i < 10000){
try {
i ++;
Thread.sleep(1);
}catch (Exception e){
e.printStackTrace();
}
}
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
System.out.println("消费消息:" + message + ":" + dateString);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeTimeout(10);
consumer.setMaxReconsumeTimes(2);
}
}
消费者配置
public class MqUntilSecond {
public static final String tag_topic = "tag_topic1"; // 主题
public static final String tag = "tag1"; // 标签
public static final String consumerGroup = "syncGroup"; // 消费集群
public static final int maxThread = 1; // 最大消费线程数
}
实验部分
总结