通用流量录制回放工具 jvm-sandbox-repeater 尝鲜 (二)——repeater-console 使用 中,可以了解到,repeater 的核心还是在 plugin 中,因此有必要去学习下。

熟悉 jvm-sandbox

repeater-plugin 的底层涉及到 jvm-sandbox 里面的一些原理。需要先阅读下相关的文档:

repeater 本身是一种 jvm-sandbox 的 module ,因此重点关注使用者、模块研发者2个章节。由于这部分非本文重点,仅摘要记录和本文关系较大的部分。

强烈建议自己动手完成 wiki 文档中的 模块编写初级,大概15分钟左右即可完成。代码不要复制粘贴,而是自己仿照文档敲出来,这样记忆比较深刻。

同时可以参考 怎么调试啊? 这个 issue ,了解下如何调试 jvm-sandbox 的模块。后续 repeater-plugin 的调试也用得上哦。

repeater-plugin 简介

特别说明,考虑到调研的目标是使用,用到一定程度再考虑更深入的了解。因此暂时先跳过对 repeater-module 及其相关依赖的解析。后续补回。

官方文档未有正式介绍,故根据个人理解,整理一下。

目前官方已经提供的插件列表如下(截止 20190717):

插件类型 录制 回放 Mock 支持时间 贡献者
http-plugin × 201906 zhaoyb1990
dubbo-plugin × 201906 zhaoyb1990
ibatis-plugin × 201906 zhaoyb1990
mybatis-plugin × 201906 ztbsuper
java-plugin 201906 zhaoyb1990
redis-plugin × × × 预期7月底 NA/NA

阅读 plugin 源码

同样源码阅读三步骤:明确阅读目的、了解整体架构、细读目标功能

step 0 明确阅读目的

学会 plugin 开发的步骤,并照样画葫芦完成一个 rabbitmq plugin 的设计与开发。

step 1 了解整体架构

先看下各个 plugin 的结构,是否有一些共通特征:

$ tree -L 12 repeater-plugins | grep -v iml | grep -v target
repeater-plugins
├── dubbo-plugin
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repeater
└── plugin
└── dubbo
├── DubboConsumerPlugin.java
├── DubboProcessor.java
├── DubboProviderPlugin.java
└── DubboRepeater.java
├── http-plugin
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repater
└── plugin
└── http
├── HttpPlugin.java
├── HttpRepeater.java
├── HttpStandaloneListener.java
├── InvokeAdvice.java
└── wrapper
├── ibatis-plugin
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repeater
└── plugin
└── ibatis
├── IBatisPlugin.java
└── IBatisProcessor.java
├── java-plugin
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repeater
└── plugin
└── java
├── JavaEntrancePlugin.java
├── JavaInvocationProcessor.java
├── JavaPluginUtils.java
├── JavaRepeater.java
└── JavaSubInvokePlugin.java
├── mybatis-plugin
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repeater
└── plugin
└── mybatis
├── MybatisPlugin.java
└── MybatisProcessor.java
├── pom.xml
├── redis-plugin
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repeater
└── plugin
└── redis
├── RedisPlugin.java
└── RedisProcessor.java

从上面可以看出,基本结构有2个大类。

为了便于理解,先从简单的开始。先看看 mybatis-plugin 。

├── mybatis-plugin
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repeater
└── plugin
└── mybatis
├── MybatisPlugin.java // 实现 InvokePlugin SPI 的类,主要标识了需监听的 java 类,以及插件的一些基础信息(名称、数据类型等)
└── MybatisProcessor.java // 实现 InvocationProcessor 接口处理调用的类,主要提供了 Identity 和 request 的组装实现。

好了,我们再看看复杂点的 http-plugin

$ tree -L 12 | grep -v iml | grep -v target
.
├── pom.xml
└── src
└── main
└── java
└── com
└── alibaba
└── jvm
└── sandbox
└── repater
└── plugin
└── http
├── HttpPlugin.java // 实现 InvokePlugin SPI 的类,可以理解为整体的入口,类似于 Spring 的 Application
├── HttpRepeater.java // 实现 Repeater ,支持回放的核心类
├── HttpStandaloneListener.java // 针对 standalone 模式的特别实现,主要是支持 header 透传 traceId
├── InvokeAdvice.java // http 请求感知 interface ,包含同步调用和异步调用
└── wrapper
├── WrapperAsyncListener.java // AsyncListener 的一个实现,主要用于应对异步请求?
├── WrapperOutputStreamCopier.java // 一个输出流复制的类,没什么逻辑,感觉是个工具类
├── WrapperRequest.java // HttpServletRequestWrapper 的一种实现类,把 request 变为一个自定义的 servlet ,便于定制实现
├── WrapperResponseCopier.java // HttpServletResponseWrapper 的实现类,把 response 变为自定义的 servlet ,便于定制实现
└── WrapperTransModel.java // 一个实体类,包含 request、response、url 等,并提供了入参为 WrapperRequest 对象的构造函数。作用未知。

小结一下:

step 2 细读目标功能

在上一步可以看到,plugin 和 processor 相对来说是更为普遍的实现方式。因此重点细读这个。

这里以 mybatis-plugin 为代表进行解析。

@MetaInfServices(InvokePlugin.class) // 标明它是一个插件 SPI 
public class MybatisPlugin extends AbstractInvokePluginAdapter {

@Override
protected List<EnhanceModel> getEnhanceModels() { // 定义一个 EnhanceModel ,标记需要监听哪些类的哪些事件
EnhanceModel em = EnhanceModel.builder()
.classPattern("org.apache.ibatis.binding.MapperMethod") // 需监听的类名为 org.apache.ibatis.binding.MapperMethod
.methodPatterns(EnhanceModel.MethodPattern.transform("execute")) // 需监听的方法名为 execute
.watchTypes(Type.BEFORE, Type.RETURN, Type.THROWS) // 监听的事件。此处监听 BEFORE(刚进入方法,调用实际逻辑前)、RETURN(调用逻辑结束,返回值已就绪,准备向上返回时)、THROWS(发现异常,异常已就绪,准备向上抛出时)
.build();
return Lists.newArrayList(em);
}

@Override
protected InvocationProcessor getInvocationProcessor() { // 实现返回 InvocationProcessor 的方法。
return new MybatisProcessor(getType()); // 这个插件本身自带 Processor ,因此返回插件自带的 Processor
}

@Override
public InvokeType getType() { // 设定 InvokeType 为 MYBATIS 。这个用于标识录制出来的是什么类型的调用。repeater 会根据录制消息的类型选择对应的插件进行回放或 mock
return InvokeType.MYBATIS;
}

@Override
public String identity() { // 设定唯一识别名称。启动加载插件时会有一个日志打印加载的插件名称,名称即来自于此处。因此需要唯一。
return "mybatis";
}

@Override
public boolean isEntrance() { // 是否入口流量插件。
return false;
}

}
class MybatisProcessor extends DefaultInvocationProcessor {

MybatisProcessor(InvokeType type) {
super(type);
}

/**
* 组装标识
* @param event 从 sandbox 获取到的 BeforeEvent 对象,记录了这个事件的相关信息
* @return 一个 Identity 对象,作为流量的标识
*/

@Override
public Identity assembleIdentity(BeforeEvent event) {

// 获取触发调用事件的对象。简单的说就是当前被拦截到方法所属于的对象
Object mapperMethod = event.target;
// SqlCommand = MapperMethod.command

// 获取这个对象对应的类中, command 这个 field
Field field = FieldUtils.getDeclaredField(mapperMethod.getClass(), "command", true);

// 如果获取到的值为 null ,把 location(第二个参数)、endpoint(第三个参数)设为 “Unknown” ,组装 Identity 并返回。
if (field == null) {
return new Identity(InvokeType.MYBATIS.name(), "Unknown", "Unknown", new HashMap<String, String>(1));
}

try {
// 获取触发调用事件对象中,“command” 这个 field 对应的对象,并存到变量 command
Object command = field.get(mapperMethod);

// 分别调用变量 command 的 getName 、getType 方法
Object name = MethodUtils.invokeMethod(command, "getName");
Object type = MethodUtils.invokeMethod(command, "getType");

// 用 type.toString() 作为 location,name.toString() 作为 endpoint ,组装 Identity 并返回
return new Identity(InvokeType.MYBATIS.name(), type.toString(), name.toString(), new HashMap<String, String>(1));

} catch (Exception e) {

// 出现任何异常,把 location(第二个参数)、endpoint(第三个参数)设为 Unknown ,组装 Identity 并返回。
return new Identity(InvokeType.MYBATIS.name(), "Unknown", "Unknown", new HashMap<String, String>(1));
}
}

@Override
public Object[] assembleRequest(BeforeEvent event) {
// MapperMethod#execute(SqlSession sqlSession, Object[] args)
// args可能存在不可序序列化异常(例如使用tk.mybatis)

// 默认父类提供的实现是返回整个 event.argumentArray ,这里的实现把它改为只返回下标为1的元素,去掉其它元素。从注释上看是为了避免后续 args 会存在不可序列化异常所以想避开它,但从实现上看取的是第2个元素而非第一个参数。原因未知。
return new Object[]{event.argumentArray[1]};
}
}

简单小结下:

那为何会做上面的操作呢?我们来看看 mybatis 中 MapperMethodexecute 方法的代码片段吧。

public class MapperMethod {

// 这个就是我们加标识要获取的 command 对象了
private final SqlCommand command;
private final MethodSignature method;

public MapperMethod(Class<?> mapperInterface, Method method, Configuration config) {
this.command = new SqlCommand(config, mapperInterface, method);
this.method = new MethodSignature(config, mapperInterface, method);
}

// 这个就是我们要捕获 execute 方法
public Object execute(SqlSession sqlSession, Object[] args) {
Object result;

// command 的 type 代表的是数据库操作类型,对应增删改查,以及 flush 共5种类型
switch (command.getType()) {
case INSERT: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.insert(command.getName(), param));
break;
}
case UPDATE: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.update(command.getName(), param));
break;
}
case DELETE: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.delete(command.getName(), param));
break;
}
case SELECT:
if (method.returnsVoid() && method.hasResultHandler()) {
executeWithResultHandler(sqlSession, args);
result = null;
} else if (method.returnsMany()) {
result = executeForMany(sqlSession, args);
} else if (method.returnsMap()) {
result = executeForMap(sqlSession, args);
} else if (method.returnsCursor()) {
result = executeForCursor(sqlSession, args);
} else {
Object param = method.convertArgsToSqlCommandParam(args);
result = sqlSession.selectOne(command.getName(), param);
}
break;
case FLUSH:
result = sqlSession.flushStatements();
break;
default:
throw new BindingException("Unknown execution method for: " + command.getName());
}
if (result == null && method.getReturnType().isPrimitive() && !method.returnsVoid()) {
throw new BindingException("Mapper method '" + command.getName()
+ " attempted to return null from a method with a primitive return type (" + method.getReturnType() + ").");
}
return result;
}
...

从代码中可以看出,这个 execute 起到了承上启下的左右,基本上所有数据库操作都会经过这里,而且也有足够的信息做单次调用的唯一的标识。

正如官方文档所说,只要找对了需要捕获的类,剩下的就顺利了。

但还有1个未解之谜:

1、为何 assembleRequest 要调整返回,改为只返回第二个入参?

官方同学给的答复:

execute 第一个参数是 SqlSession,不需要也不能序列化,对于录制和回放也没有意义。assembleRequest本身也是作为 request 加工使用,有些参数是不一定需要使用的。

开始开发 rabbitmq 的插件

经过上面的解读,开发的方案就比较清晰了。

1、rabbitmq 主要会有2种被调用的情况。一种是生产者,作为子调用生成 mq 信息发到队列中。另一种是消费者,作为入口调用触发后续逻辑。在录制回放中,消费者的场景更为重要,需要优先满足。这种场景下,需要实现回放。
2、需要找到 rabbitmq 作为消费者的承上启下类和方法,并对应 RabbitMqPlugin 。
3、实现对应的 processor 类以及 repeater 类,实现回放。

更详细的,后续完成后再补充。


↙↙↙阅读原文可查看相关链接,并与作者交流