转转QA MQ 消息构造 -- 学会分解问题

笑哼 for 转转QA · 2019年05月28日 · 最后由 笑哼 回复于 2019年06月16日 · 2613 次阅读

作者 | 赵力新

各位亲爱的朋友,本文小编将抛砖引玉,谈一谈在工作中遇到问题的处理思路,希望给遇到迷惑的朋友一点指引。

RocketMQ 简介 -- 技术背景

RocketMQ 是阿里向 Apache 贡献的消息中间件,是一个开源的分布式消息传递和流式数据平台。
随着公司的体量、业务呈现指数式增长,技术层面开始面临着数据采集、数据异构、系统整合等诸多问题。由于 RPC 采用同步处理技术,在性能、健壮性、可扩展性上都存在着诸多缺点,消息队列以异步处理,非阻塞调用的技术模型悄然登场,并迅速成为分布式架构的宠儿。在消息队列的诸多模型中,RocketMQ 以其消息不丢失、架构优势、高可用、高吞吐量等诸多优势被广泛应用。
其架构图如下:

主要分以下几个模块:
1.nameserv
是基于服务注册发现功能的无状态组件,支持独立部署。在 RocketMQ 架构体系中,属于重中之重。生产者从 namesrv 中获取可用的 broker 地址,将消息发送至 broker;消费者从 namesrv 中获取可用的 broker 地址,从 broker 中拉取消息;broker 定时向 naemsrv 发送心跳信息,维护可用 broker 地址。
2.broker
是基于高性能和低延迟的文件存储的无状态组件,支持独立部署。在 RocketMQ 架构体系中,属于核心级组件,负责存储所有的消息相关文件(包括消息文件,日志文件等等)。
3.生产者及消费者
是我们在实际编码过程中直接接触到的,也是 RocketMQ 直接暴露给编码人员的 API 接口,生产者和消费者支持分布式部署。生产者,产生消息的实例;消费者,接收消息进行消费的实例。

应用背景

在开发自测或者测试工作中,经常会遇到这种情况,待测方处于消费者下游,需要收到一个 RocketMQ 消息(订单状态变更/商品状态变更),才能触发要测的流程(例子:订单状态变更为已收货->给买卖双方发送验机评价消息,商品状态变更为风控下架->给卖家发送退回商品的信息等)。为了更好的模拟待测业务场景,又避免构造太多和本次测试无关的数据,我们需要一个模拟生产 MQ 消息的工具。
OK,来搞一个模拟生产 MQ 消息的工具吧。
接到任务的时候先不要瑟瑟发抖,来跟小编做几个思考题~
思考:1.MQ 消息是什么样子的?
从日志中抓取一个被包装后的 MQ 消息如下:

MQ Message 由一些关键的信息组成:
1.Topic:消息的业务逻辑分类,区分订单类的消息 or 库存类的消息;
2.Tag:对 Topic 的进一步细化,标记同一业务模块中不同用途的消息;
3.Body:承载消息的消息体;
4.Keys:用于消息的存储和查询,此处可以用默认值;

思考:2.如何生产一条 MQ 消息?
业务代码依赖 alibaba 的 RocketMQ-client jar 包,主要过程很清晰,简要代码如下:

思考:3.模拟生产 MQ 消息的工具设计成什么样才便于大家使用呢?
如果是一个人用,写一段代码 、一个单元测试运行一下即可。但是为了降低多人使用时安装运行环境的成本,我们最好提供一个接口,一个页面,让使用者输入 Topic/Tag/Body,点击发送按钮就能发送一条 MQ 消息了。

小编主要是提供后端接口的,刚刚好小编同时用 Java 和 Python 两种语言。如果用 Java 来提供接口,实现过程类似第二步生产 MQ 消息的代码截图,把参数提取出来即可。
(到这里,其实你的工具已经能够做好了。下边是一道附加题。)
如果你的后端是 Python 语言来写的,如何用 Python 来生产一条 RocketMQ 消息呢?

思考:4.Python 能否生产 RocketMQ 消息?怎么生产?
通过查阅资料可知,RocketMQ 提供了 jar 包,也有提供 python 第三方包,那么 Python 能否调用 jar 包,使用 Java 的 API 呢?
答案是肯定的,Python 中有个 JPype,可以启动 JVM,方便地调用 Jar 包。使用过程如下:
1.通过 startJVM() 方法启动 JVM,把依赖的 Jar 包路径通过"-Djava.ext.dirs="参数设置进去;
2.通过 JPackage 引入所需的 Java package,可以直接调用 Java 类的接口;

DefaultMQProducer = JPackage('com.alibaba.rocketmq.client.producer').DefaultMQProducer
MQClientException = JPackage('com.alibaba.rocketmq.client.exception').MQClientException
SendResult = JPackage('com.alibaba.rocketmq.client.producer').SendResult
Message = JPackage('com.alibaba.rocketmq.common.message').Message
SENDSTATUS = JPackage('com.alibaba.rocketmq.client.producer').SendStatus

3.实例化 producer,并启动它;
4.实例化 Message;
5.通过 producer 的 send() 方法发送消息;
6.通过 producer.shutdown() 方法来关闭 producer;
7.通过 Jpype 的 shutdownJVM() 方法来关闭 JVM;
通过上述步骤可以看出,Python 和 Java 中生产一个 MQ 消息是一样的,这个思想可以用于其他在 Python 中调用 Jar 包的场景。
好啦,通过小编的讲解,“弄一个发送 MQ 消息的工具” 是不是变得很简单了呢?
最后,小编要说,遇到问题多 Google,其实最主要的问题,是知道自己下一步要做什么。牛人千千万,大多数问题都已经有人解决了,我们需要多查资料,站在牛人的肩膀上,多多思考。

共收到 8 条回复 时间 点赞

已点赞! 不过实际的应用有点个人疑问啊,就是说变更订单状态是通过发送消息完成的,这个和手动去点击页面确认订单而发送消息,有哪些区别呢?我目前想到的,只有模拟生产的消息可以自己可以任意变更内容,而手动点击页面的消息是测试环境程序设定好的这点区别

是这样的,咱们这个应用场景主要不是这种自己业务流程中消息的串联,而是兄弟业务之间需要依赖的消息,在消费时触发的代码逻辑。举个例子,业务 A 需要监听支付完成的交易 MQ 消息,来触发逻辑 B。那么我可以重复构造交易 MQ 消息来测试逻辑 B,而不用重复构造支付完成的订单。
当然了,如果需要自动化测试订单状态的变更,也可以通过模拟发送一条消息来进行场景构造,而非手动点击页面。这两者目的是一样的。

笑哼 回复

理解了。业务 A~消息~业务 B,这种如果业务 b 已经之前被处理过了,对应的数据库里的数据,甚至缓存之类已经有过改变。如果再次通过发送消息去处理,会不会存在风险,毕竟业务 B 对应的数据和真正的未被处理过的还是有差异的。当然,一般而言只要代码逻辑正确是不会有问题,但就怕万一

黑山老妖 回复

有道理

不错不错。有个小问题问下,这边用的是 RocketMQ,我记得 RabbitMQ 里有个管理页面,登录进去也可以给指定的 Queue 发送消息,跟楼主自己开发的是有区别吗?

Keyens 回复

应该有区别吧

Python 有 Celery😀

Karaser 回复

嗯,celly+rabbitmq

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册