转转QA MQ 消息构造--学会分解问题

笑哼 for 转转QA · May 28, 2019 · Last by 笑哼 replied at June 16, 2019 · 878 hits

作者|赵力新

各位亲爱的朋友,本文小编将抛砖引玉,谈一谈在工作中遇到问题的处理思路,希望给遇到迷惑的朋友一点指引。

RocketMQ简介--技术背景

RocketMQ是阿里向Apache贡献的消息中间件,是一个开源的分布式消息传递和流式数据平台。
随着公司的体量、业务呈现指数式增长,技术层面开始面临着数据采集、数据异构、系统整合等诸多问题。由于RPC采用同步处理技术,在性能、健壮性、可扩展性上都存在着诸多缺点,消息队列以异步处理,非阻塞调用的技术模型悄然登场,并迅速成为分布式架构的宠儿。在消息队列的诸多模型中,RocketMQ以其消息不丢失、架构优势、高可用、高吞吐量等诸多优势被广泛应用。
其架构图如下:

主要分以下几个模块:
1.nameserv
是基于服务注册发现功能的无状态组件,支持独立部署。在RocketMQ架构体系中,属于重中之重。生产者从namesrv中获取可用的broker地址,将消息发送至broker;消费者从namesrv中获取可用的broker地址,从broker中拉取消息;broker定时向naemsrv发送心跳信息,维护可用broker地址。
2.broker
是基于高性能和低延迟的文件存储的无状态组件,支持独立部署。在RocketMQ架构体系中,属于核心级组件,负责存储所有的消息相关文件(包括消息文件,日志文件等等)。
3.生产者及消费者
是我们在实际编码过程中直接接触到的,也是RocketMQ直接暴露给编码人员的API接口,生产者和消费者支持分布式部署。生产者,产生消息的实例;消费者,接收消息进行消费的实例。

应用背景

在开发自测或者测试工作中,经常会遇到这种情况,待测方处于消费者下游,需要收到一个RocketMQ消息(订单状态变更/商品状态变更),才能触发要测的流程(例子:订单状态变更为已收货->给买卖双方发送验机评价消息,商品状态变更为风控下架->给卖家发送退回商品的信息等)。为了更好的模拟待测业务场景,又避免构造太多和本次测试无关的数据,我们需要一个模拟生产MQ消息的工具。
OK,来搞一个模拟生产MQ消息的工具吧。
接到任务的时候先不要瑟瑟发抖,来跟小编做几个思考题~
思考:1.MQ消息是什么样子的?
从日志中抓取一个被包装后的MQ消息如下:

MQ Message由一些关键的信息组成:
1.Topic:消息的业务逻辑分类,区分订单类的消息or库存类的消息;
2.Tag:对Topic的进一步细化,标记同一业务模块中不同用途的消息;
3.Body:承载消息的消息体;
4.Keys:用于消息的存储和查询,此处可以用默认值;

思考:2.如何生产一条MQ消息?
业务代码依赖alibaba的RocketMQ-client jar包,主要过程很清晰,简要代码如下:

思考:3.模拟生产MQ消息的工具设计成什么样才便于大家使用呢?
如果是一个人用,写一段代码 、一个单元测试运行一下即可。但是为了降低多人使用时安装运行环境的成本,我们最好提供一个接口,一个页面,让使用者输入Topic/Tag/Body,点击发送按钮就能发送一条MQ消息了。

小编主要是提供后端接口的,刚刚好小编同时用Java和Python两种语言。如果用Java来提供接口,实现过程类似第二步生产MQ消息的代码截图,把参数提取出来即可。
(到这里,其实你的工具已经能够做好了。下边是一道附加题。)
如果你的后端是Python语言来写的,如何用Python来生产一条RocketMQ消息呢?

思考:4.Python能否生产RocketMQ消息?怎么生产?
通过查阅资料可知,RocketMQ提供了jar包,也有提供python第三方包,那么Python能否调用jar包,使用Java的API呢?
答案是肯定的,Python中有个JPype,可以启动JVM,方便地调用Jar包。使用过程如下:
1.通过startJVM()方法启动JVM,把依赖的Jar包路径通过"-Djava.ext.dirs="参数设置进去;
2.通过JPackage引入所需的Java package,可以直接调用Java类的接口;

DefaultMQProducer = JPackage('com.alibaba.rocketmq.client.producer').DefaultMQProducer
MQClientException = JPackage('com.alibaba.rocketmq.client.exception').MQClientException
SendResult = JPackage('com.alibaba.rocketmq.client.producer').SendResult
Message = JPackage('com.alibaba.rocketmq.common.message').Message
SENDSTATUS = JPackage('com.alibaba.rocketmq.client.producer').SendStatus

3.实例化producer,并启动它;
4.实例化Message;
5.通过producer的send()方法发送消息;
6.通过producer.shutdown()方法来关闭producer;
7.通过Jpype的shutdownJVM()方法来关闭JVM;
通过上述步骤可以看出,Python和Java中生产一个MQ消息是一样的,这个思想可以用于其他在Python中调用Jar包的场景。
好啦,通过小编的讲解,“弄一个发送MQ消息的工具” 是不是变得很简单了呢?
最后,小编要说,遇到问题多Google,其实最主要的问题,是知道自己下一步要做什么。牛人千千万,大多数问题都已经有人解决了,我们需要多查资料,站在牛人的肩膀上,多多思考。

共收到 8 条回复 时间 点赞

已点赞!不过实际的应用有点个人疑问啊,就是说变更订单状态是通过发送消息完成的,这个和手动去点击页面确认订单而发送消息,有哪些区别呢?我目前想到的,只有模拟生产的消息可以自己可以任意变更内容,而手动点击页面的消息是测试环境程序设定好的这点区别

笑哼 #2 · May 29, 2019 作者

是这样的,咱们这个应用场景主要不是这种自己业务流程中消息的串联,而是兄弟业务之间需要依赖的消息,在消费时触发的代码逻辑。举个例子,业务A需要监听支付完成的交易MQ消息,来触发逻辑B。那么我可以重复构造交易MQ消息来测试逻辑B,而不用重复构造支付完成的订单。
当然了,如果需要自动化测试订单状态的变更,也可以通过模拟发送一条消息来进行场景构造,而非手动点击页面。这两者目的是一样的。

笑哼 回复

理解了。业务A~消息~业务B,这种如果业务b已经之前被处理过了,对应的数据库里的数据,甚至缓存之类已经有过改变。如果再次通过发送消息去处理,会不会存在风险,毕竟业务B对应的数据和真正的未被处理过的还是有差异的。当然,一般而言只要代码逻辑正确是不会有问题,但就怕万一

笑哼 #4 · May 29, 2019 作者
黑山老妖 回复

有道理

不错不错。有个小问题问下,这边用的是RocketMQ,我记得RabbitMQ里有个管理页面,登录进去也可以给指定的Queue发送消息,跟楼主自己开发的是有区别吗?

笑哼 #6 · June 12, 2019 作者
Joeylu 回复

应该有区别吧

Python有Celery😀

笑哼 #8 · June 16, 2019 作者
SheldonBean 回复

嗯,celly+rabbitmq

需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up