在 通用流量录制回放工具 jvm-sandbox-repeater 尝鲜 (二)——repeater-console 使用 中,可以了解到,repeater 的核心还是在 plugin 中,因此有必要去学习下。
repeater-plugin 的底层涉及到 jvm-sandbox 里面的一些原理。需要先阅读下相关的文档:
repeater 本身是一种 jvm-sandbox 的 module ,因此重点关注使用者、模块研发者 2 个章节。由于这部分非本文重点,仅摘要记录和本文关系较大的部分。
${HOME}/.sandbox-module/
下 _data
字段的接口)强烈建议自己动手完成 wiki 文档中的 模块编写初级,大概 15 分钟左右即可完成。代码不要复制粘贴,而是自己仿照文档敲出来,这样记忆比较深刻。
同时可以参考 怎么调试啊? 这个 issue ,了解下如何调试 jvm-sandbox 的模块。后续 repeater-plugin 的调试也用得上哦。
特别说明,考虑到调研的目标是使用,用到一定程度再考虑更深入的了解。因此暂时先跳过对 repeater-module 及其相关依赖的解析。后续补回。
官方文档未有正式介绍,故根据个人理解,整理一下。
目前官方已经提供的插件列表如下(截止 20190717):
插件类型 | 录制 | 回放 | Mock | 支持时间 | 贡献者 |
---|---|---|---|---|---|
http-plugin | √ | √ | × | 201906 | zhaoyb1990 |
dubbo-plugin | √ | × | √ | 201906 | zhaoyb1990 |
ibatis-plugin | √ | × | √ | 201906 | zhaoyb1990 |
mybatis-plugin | √ | × | √ | 201906 | ztbsuper |
java-plugin | √ | √ | √ | 201906 | zhaoyb1990 |
redis-plugin | × | × | × | 预期 7 月底 | NA/NA |
同样源码阅读三步骤:明确阅读目的、了解整体架构、细读目标功能
学会 plugin 开发的步骤,并照样画葫芦完成一个 rabbitmq plugin 的设计与开发。
先看下各个 plugin 的结构,是否有一些共通特征:
$ tree -L 12 repeater-plugins | grep -v iml | grep -v target
repeater-plugins
├── dubbo-plugin
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── alibaba
│ └── jvm
│ └── sandbox
│ └── repeater
│ └── plugin
│ └── dubbo
│ ├── DubboConsumerPlugin.java
│ ├── DubboProcessor.java
│ ├── DubboProviderPlugin.java
│ └── DubboRepeater.java
├── http-plugin
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── alibaba
│ └── jvm
│ └── sandbox
│ └── repater
│ └── plugin
│ └── http
│ ├── HttpPlugin.java
│ ├── HttpRepeater.java
│ ├── HttpStandaloneListener.java
│ ├── InvokeAdvice.java
│ └── wrapper
├── ibatis-plugin
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── alibaba
│ └── jvm
│ └── sandbox
│ └── repeater
│ └── plugin
│ └── ibatis
│ ├── IBatisPlugin.java
│ └── IBatisProcessor.java
├── java-plugin
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── alibaba
│ └── jvm
│ └── sandbox
│ └── repeater
│ └── plugin
│ └── java
│ ├── JavaEntrancePlugin.java
│ ├── JavaInvocationProcessor.java
│ ├── JavaPluginUtils.java
│ ├── JavaRepeater.java
│ └── JavaSubInvokePlugin.java
├── mybatis-plugin
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── alibaba
│ └── jvm
│ └── sandbox
│ └── repeater
│ └── plugin
│ └── mybatis
│ ├── MybatisPlugin.java
│ └── MybatisProcessor.java
├── pom.xml
├── redis-plugin
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── alibaba
│ └── jvm
│ └── sandbox
│ └── repeater
│ └── plugin
│ └── redis
│ ├── RedisPlugin.java
│ └── RedisProcessor.java
从上面可以看出,基本结构有 2 个大类。
为了便于理解,先从简单的开始。先看看 mybatis-plugin 。
├── mybatis-plugin
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── alibaba
│ └── jvm
│ └── sandbox
│ └── repeater
│ └── plugin
│ └── mybatis
│ ├── MybatisPlugin.java // 实现 InvokePlugin SPI 的类,主要标识了需监听的 java 类,以及插件的一些基础信息(名称、数据类型等)
│ └── MybatisProcessor.java // 实现 InvocationProcessor 接口处理调用的类,主要提供了 Identity 和 request 的组装实现。
好了,我们再看看复杂点的 http-plugin
$ tree -L 12 | grep -v iml | grep -v target
.
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repater
└── plugin
└── http
├── HttpPlugin.java // 实现 InvokePlugin SPI 的类,可以理解为整体的入口,类似于 Spring 的 Application
├── HttpRepeater.java // 实现 Repeater ,支持回放的核心类
├── HttpStandaloneListener.java // 针对 standalone 模式的特别实现,主要是支持 header 透传 traceId
├── InvokeAdvice.java // http 请求感知 interface ,包含同步调用和异步调用
└── wrapper
├── WrapperAsyncListener.java // AsyncListener 的一个实现,主要用于应对异步请求?
├── WrapperOutputStreamCopier.java // 一个输出流复制的类,没什么逻辑,感觉是个工具类
├── WrapperRequest.java // HttpServletRequestWrapper 的一种实现类,把 request 变为一个自定义的 servlet ,便于定制实现
├── WrapperResponseCopier.java // HttpServletResponseWrapper 的实现类,把 response 变为自定义的 servlet ,便于定制实现
└── WrapperTransModel.java // 一个实体类,包含 request、response、url 等,并提供了入参为 WrapperRequest 对象的构造函数。作用未知。
小结一下:
在上一步可以看到,plugin 和 processor 相对来说是更为普遍的实现方式。因此重点细读这个。
这里以 mybatis-plugin 为代表进行解析。
MybatisPlugin
@MetaInfServices(InvokePlugin.class) // 标明它是一个插件 SPI
public class MybatisPlugin extends AbstractInvokePluginAdapter {
@Override
protected List<EnhanceModel> getEnhanceModels() { // 定义一个 EnhanceModel ,标记需要监听哪些类的哪些事件
EnhanceModel em = EnhanceModel.builder()
.classPattern("org.apache.ibatis.binding.MapperMethod") // 需监听的类名为 org.apache.ibatis.binding.MapperMethod
.methodPatterns(EnhanceModel.MethodPattern.transform("execute")) // 需监听的方法名为 execute
.watchTypes(Type.BEFORE, Type.RETURN, Type.THROWS) // 监听的事件。此处监听 BEFORE(刚进入方法,调用实际逻辑前)、RETURN(调用逻辑结束,返回值已就绪,准备向上返回时)、THROWS(发现异常,异常已就绪,准备向上抛出时)
.build();
return Lists.newArrayList(em);
}
@Override
protected InvocationProcessor getInvocationProcessor() { // 实现返回 InvocationProcessor 的方法。
return new MybatisProcessor(getType()); // 这个插件本身自带 Processor ,因此返回插件自带的 Processor
}
@Override
public InvokeType getType() { // 设定 InvokeType 为 MYBATIS 。这个用于标识录制出来的是什么类型的调用。repeater 会根据录制消息的类型选择对应的插件进行回放或 mock
return InvokeType.MYBATIS;
}
@Override
public String identity() { // 设定唯一识别名称。启动加载插件时会有一个日志打印加载的插件名称,名称即来自于此处。因此需要唯一。
return "mybatis";
}
@Override
public boolean isEntrance() { // 是否入口流量插件。
return false;
}
}
MybatisProcessor
class MybatisProcessor extends DefaultInvocationProcessor {
MybatisProcessor(InvokeType type) {
super(type);
}
/**
* 组装标识
* @param event 从 sandbox 获取到的 BeforeEvent 对象,记录了这个事件的相关信息
* @return 一个 Identity 对象,作为流量的标识
*/
@Override
public Identity assembleIdentity(BeforeEvent event) {
// 获取触发调用事件的对象。简单的说就是当前被拦截到方法所属于的对象
Object mapperMethod = event.target;
// SqlCommand = MapperMethod.command
// 获取这个对象对应的类中, command 这个 field
Field field = FieldUtils.getDeclaredField(mapperMethod.getClass(), "command", true);
// 如果获取到的值为 null ,把 location(第二个参数)、endpoint(第三个参数)设为 “Unknown” ,组装 Identity 并返回。
if (field == null) {
return new Identity(InvokeType.MYBATIS.name(), "Unknown", "Unknown", new HashMap<String, String>(1));
}
try {
// 获取触发调用事件对象中,“command” 这个 field 对应的对象,并存到变量 command
Object command = field.get(mapperMethod);
// 分别调用变量 command 的 getName 、getType 方法
Object name = MethodUtils.invokeMethod(command, "getName");
Object type = MethodUtils.invokeMethod(command, "getType");
// 用 type.toString() 作为 location,name.toString() 作为 endpoint ,组装 Identity 并返回
return new Identity(InvokeType.MYBATIS.name(), type.toString(), name.toString(), new HashMap<String, String>(1));
} catch (Exception e) {
// 出现任何异常,把 location(第二个参数)、endpoint(第三个参数)设为 Unknown ,组装 Identity 并返回。
return new Identity(InvokeType.MYBATIS.name(), "Unknown", "Unknown", new HashMap<String, String>(1));
}
}
@Override
public Object[] assembleRequest(BeforeEvent event) {
// MapperMethod#execute(SqlSession sqlSession, Object[] args)
// args可能存在不可序序列化异常(例如使用tk.mybatis)
// 默认父类提供的实现是返回整个 event.argumentArray ,这里的实现把它改为只返回下标为1的元素,去掉其它元素。从注释上看是为了避免后续 args 会存在不可序列化异常所以想避开它,但从实现上看取的是第2个元素而非第一个参数。原因未知。
return new Object[]{event.argumentArray[1]};
}
}
简单小结下:
org.apache.ibatis.binding.MapperMethod
这个类的 execute
方法,会监听 BEFORE、AFTER、THROW 三种事件。MapperMethod
这个类的对象的 command 属性值,把里面的 type、name 属性加入到流量标识中。否则就用 unknown 填充。那为何会做上面的操作呢?我们来看看 mybatis 中 MapperMethod
类 execute
方法的代码片段吧。
public class MapperMethod {
// 这个就是我们加标识要获取的 command 对象了
private final SqlCommand command;
private final MethodSignature method;
public MapperMethod(Class<?> mapperInterface, Method method, Configuration config) {
this.command = new SqlCommand(config, mapperInterface, method);
this.method = new MethodSignature(config, mapperInterface, method);
}
// 这个就是我们要捕获 execute 方法
public Object execute(SqlSession sqlSession, Object[] args) {
Object result;
// command 的 type 代表的是数据库操作类型,对应增删改查,以及 flush 共5种类型
switch (command.getType()) {
case INSERT: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.insert(command.getName(), param));
break;
}
case UPDATE: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.update(command.getName(), param));
break;
}
case DELETE: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.delete(command.getName(), param));
break;
}
case SELECT:
if (method.returnsVoid() && method.hasResultHandler()) {
executeWithResultHandler(sqlSession, args);
result = null;
} else if (method.returnsMany()) {
result = executeForMany(sqlSession, args);
} else if (method.returnsMap()) {
result = executeForMap(sqlSession, args);
} else if (method.returnsCursor()) {
result = executeForCursor(sqlSession, args);
} else {
Object param = method.convertArgsToSqlCommandParam(args);
result = sqlSession.selectOne(command.getName(), param);
}
break;
case FLUSH:
result = sqlSession.flushStatements();
break;
default:
throw new BindingException("Unknown execution method for: " + command.getName());
}
if (result == null && method.getReturnType().isPrimitive() && !method.returnsVoid()) {
throw new BindingException("Mapper method '" + command.getName()
+ " attempted to return null from a method with a primitive return type (" + method.getReturnType() + ").");
}
return result;
}
...
从代码中可以看出,这个 execute
起到了承上启下的左右,基本上所有数据库操作都会经过这里,而且也有足够的信息做单次调用的唯一的标识。
正如官方文档所说,只要找对了需要捕获的类,剩下的就顺利了。
但还有 1 个未解之谜:
1、为何 assembleRequest 要调整返回,改为只返回第二个入参?
官方同学给的答复:
execute 第一个参数是 SqlSession,不需要也不能序列化,对于录制和回放也没有意义。assembleRequest 本身也是作为 request 加工使用,有些参数是不一定需要使用的。
经过上面的解读,开发的方案就比较清晰了。
1、rabbitmq 主要会有 2 种被调用的情况。一种是生产者,作为子调用生成 mq 信息发到队列中。另一种是消费者,作为入口调用触发后续逻辑。在录制回放中,消费者的场景更为重要,需要优先满足。这种场景下,需要实现回放。
2、需要找到 rabbitmq 作为消费者的承上启下类和方法,并对应 RabbitMqPlugin 。
3、实现对应的 processor 类以及 repeater 类,实现回放。
更详细的,后续完成后再补充。