通用流量录制回放工具 jvm-sandbox-repeater 尝鲜 (二)——repeater-console 使用 中,可以了解到,repeater 的核心还是在 plugin 中,因此有必要去学习下。

熟悉 jvm-sandbox

repeater-plugin 的底层涉及到 jvm-sandbox 里面的一些原理。需要先阅读下相关的文档:

repeater 本身是一种 jvm-sandbox 的 module ,因此重点关注使用者、模块研发者 2 个章节。由于这部分非本文重点,仅摘要记录和本文关系较大的部分。

强烈建议自己动手完成 wiki 文档中的 模块编写初级,大概 15 分钟左右即可完成。代码不要复制粘贴,而是自己仿照文档敲出来,这样记忆比较深刻。

同时可以参考 怎么调试啊? 这个 issue ,了解下如何调试 jvm-sandbox 的模块。后续 repeater-plugin 的调试也用得上哦。

repeater-plugin 简介

特别说明,考虑到调研的目标是使用,用到一定程度再考虑更深入的了解。因此暂时先跳过对 repeater-module 及其相关依赖的解析。后续补回。

官方文档未有正式介绍,故根据个人理解,整理一下。

目前官方已经提供的插件列表如下(截止 20190717):

插件类型 录制 回放 Mock 支持时间 贡献者
http-plugin × 201906 zhaoyb1990
dubbo-plugin × 201906 zhaoyb1990
ibatis-plugin × 201906 zhaoyb1990
mybatis-plugin × 201906 ztbsuper
java-plugin 201906 zhaoyb1990
redis-plugin × × × 预期 7 月底 NA/NA

阅读 plugin 源码

同样源码阅读三步骤:明确阅读目的、了解整体架构、细读目标功能

step 0 明确阅读目的

学会 plugin 开发的步骤,并照样画葫芦完成一个 rabbitmq plugin 的设计与开发。

step 1 了解整体架构

先看下各个 plugin 的结构,是否有一些共通特征:

$ tree -L 12 repeater-plugins | grep -v iml | grep -v target
repeater-plugins
├── dubbo-plugin
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
│               └── com
│                   └── alibaba
│                       └── jvm
│                           └── sandbox
│                               └── repeater
│                                   └── plugin
│                                       └── dubbo
│                                           ├── DubboConsumerPlugin.java
│                                           ├── DubboProcessor.java 
│                                           ├── DubboProviderPlugin.java
│                                           └── DubboRepeater.java
├── http-plugin
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
│               └── com
│                   └── alibaba
│                       └── jvm
│                           └── sandbox
│                               └── repater
│                                   └── plugin
│                                       └── http
│                                           ├── HttpPlugin.java
│                                           ├── HttpRepeater.java
│                                           ├── HttpStandaloneListener.java
│                                           ├── InvokeAdvice.java
│                                           └── wrapper
├── ibatis-plugin
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
│               └── com
│                   └── alibaba
│                       └── jvm
│                           └── sandbox
│                               └── repeater
│                                   └── plugin
│                                       └── ibatis
│                                           ├── IBatisPlugin.java
│                                           └── IBatisProcessor.java
├── java-plugin
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
│               └── com
│                   └── alibaba
│                       └── jvm
│                           └── sandbox
│                               └── repeater
│                                   └── plugin
│                                       └── java
│                                           ├── JavaEntrancePlugin.java
│                                           ├── JavaInvocationProcessor.java
│                                           ├── JavaPluginUtils.java
│                                           ├── JavaRepeater.java
│                                           └── JavaSubInvokePlugin.java
├── mybatis-plugin
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
│               └── com
│                   └── alibaba
│                       └── jvm
│                           └── sandbox
│                               └── repeater
│                                   └── plugin
│                                       └── mybatis
│                                           ├── MybatisPlugin.java
│                                           └── MybatisProcessor.java
├── pom.xml
├── redis-plugin
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
│               └── com
│                   └── alibaba
│                       └── jvm
│                           └── sandbox
│                               └── repeater
│                                   └── plugin
│                                       └── redis
│                                           ├── RedisPlugin.java
│                                           └── RedisProcessor.java

从上面可以看出,基本结构有 2 个大类。

为了便于理解,先从简单的开始。先看看 mybatis-plugin 。

├── mybatis-plugin
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
│               └── com
│                   └── alibaba
│                       └── jvm
│                           └── sandbox
│                               └── repeater
│                                   └── plugin
│                                       └── mybatis
│                                           ├── MybatisPlugin.java    // 实现 InvokePlugin SPI 的类,主要标识了需监听的 java 类,以及插件的一些基础信息(名称、数据类型等)
│                                           └── MybatisProcessor.java // 实现 InvocationProcessor 接口处理调用的类,主要提供了 Identity 和 request 的组装实现。

好了,我们再看看复杂点的 http-plugin

$ tree -L 12 | grep -v iml | grep -v target
.
├── pom.xml
└── src
    └── main
        └── java
            └── com
                └── alibaba
                    └── jvm
                        └── sandbox
                            └── repater
                                └── plugin
                                    └── http
                                        ├── HttpPlugin.java      // 实现 InvokePlugin SPI 的类,可以理解为整体的入口,类似于 Spring 的 Application
                                        ├── HttpRepeater.java    // 实现 Repeater ,支持回放的核心类
                                        ├── HttpStandaloneListener.java // 针对 standalone 模式的特别实现,主要是支持 header 透传 traceId 
                                        ├── InvokeAdvice.java    // http 请求感知 interface ,包含同步调用和异步调用
                                        └── wrapper
                                            ├── WrapperAsyncListener.java // AsyncListener 的一个实现,主要用于应对异步请求?
                                            ├── WrapperOutputStreamCopier.java // 一个输出流复制的类,没什么逻辑,感觉是个工具类
                                            ├── WrapperRequest.java // HttpServletRequestWrapper 的一种实现类,把 request 变为一个自定义的 servlet ,便于定制实现
                                            ├── WrapperResponseCopier.java // HttpServletResponseWrapper 的实现类,把 response 变为自定义的 servlet ,便于定制实现
                                            └── WrapperTransModel.java // 一个实体类,包含 request、response、url 等,并提供了入参为 WrapperRequest 对象的构造函数。作用未知。

小结一下:

step 2 细读目标功能

在上一步可以看到,plugin 和 processor 相对来说是更为普遍的实现方式。因此重点细读这个。

这里以 mybatis-plugin 为代表进行解析。

@MetaInfServices(InvokePlugin.class) // 标明它是一个插件 SPI 
public class MybatisPlugin extends AbstractInvokePluginAdapter {

    @Override
    protected List<EnhanceModel> getEnhanceModels() { // 定义一个 EnhanceModel ,标记需要监听哪些类的哪些事件
        EnhanceModel em = EnhanceModel.builder()
                .classPattern("org.apache.ibatis.binding.MapperMethod") // 需监听的类名为 org.apache.ibatis.binding.MapperMethod
                .methodPatterns(EnhanceModel.MethodPattern.transform("execute")) // 需监听的方法名为 execute
                .watchTypes(Type.BEFORE, Type.RETURN, Type.THROWS) // 监听的事件。此处监听 BEFORE(刚进入方法,调用实际逻辑前)、RETURN(调用逻辑结束,返回值已就绪,准备向上返回时)、THROWS(发现异常,异常已就绪,准备向上抛出时)
                .build();
        return Lists.newArrayList(em);
    }

    @Override
    protected InvocationProcessor getInvocationProcessor() { // 实现返回 InvocationProcessor 的方法。
        return new MybatisProcessor(getType()); // 这个插件本身自带 Processor ,因此返回插件自带的 Processor
    }

    @Override
    public InvokeType getType() {  // 设定 InvokeType 为 MYBATIS 。这个用于标识录制出来的是什么类型的调用。repeater 会根据录制消息的类型选择对应的插件进行回放或 mock 
        return InvokeType.MYBATIS; 
    }

    @Override
    public String identity() {  // 设定唯一识别名称。启动加载插件时会有一个日志打印加载的插件名称,名称即来自于此处。因此需要唯一。
        return "mybatis";         
    }

    @Override
    public boolean isEntrance() { // 是否入口流量插件。
        return false;
    }

}
class MybatisProcessor extends DefaultInvocationProcessor {

    MybatisProcessor(InvokeType type) {
        super(type);
    }

    /**
     * 组装标识
     * @param event 从 sandbox 获取到的 BeforeEvent 对象,记录了这个事件的相关信息
     * @return 一个 Identity 对象,作为流量的标识
     */
    @Override
    public Identity assembleIdentity(BeforeEvent event) { 

        // 获取触发调用事件的对象。简单的说就是当前被拦截到方法所属于的对象
        Object mapperMethod = event.target;               
        // SqlCommand = MapperMethod.command

        // 获取这个对象对应的类中, command 这个 field 
        Field field = FieldUtils.getDeclaredField(mapperMethod.getClass(), "command", true); 

        // 如果获取到的值为 null ,把 location(第二个参数)、endpoint(第三个参数)设为 “Unknown” ,组装 Identity 并返回。
        if (field == null) { 
            return new Identity(InvokeType.MYBATIS.name(), "Unknown", "Unknown", new HashMap<String, String>(1));  
        }

        try {
            // 获取触发调用事件对象中,“command” 这个 field 对应的对象,并存到变量 command 
            Object command = field.get(mapperMethod); 

            // 分别调用变量 command 的 getName 、getType 方法
            Object name = MethodUtils.invokeMethod(command, "getName");
            Object type = MethodUtils.invokeMethod(command, "getType");

            // 用 type.toString() 作为 location,name.toString() 作为 endpoint ,组装 Identity 并返回
            return new Identity(InvokeType.MYBATIS.name(), type.toString(), name.toString(), new HashMap<String, String>(1));

        } catch (Exception e) {

            // 出现任何异常,把 location(第二个参数)、endpoint(第三个参数)设为 Unknown ,组装 Identity 并返回。
            return new Identity(InvokeType.MYBATIS.name(), "Unknown", "Unknown", new HashMap<String, String>(1));
        }
    }

    @Override
    public Object[] assembleRequest(BeforeEvent event) {
        // MapperMethod#execute(SqlSession sqlSession, Object[] args)
        // args可能存在不可序序列化异常(例如使用tk.mybatis)

        // 默认父类提供的实现是返回整个 event.argumentArray ,这里的实现把它改为只返回下标为1的元素,去掉其它元素。从注释上看是为了避免后续 args 会存在不可序列化异常所以想避开它,但从实现上看取的是第2个元素而非第一个参数。原因未知。
        return new Object[]{event.argumentArray[1]};
    }
}

简单小结下:

那为何会做上面的操作呢?我们来看看 mybatis 中 MapperMethodexecute 方法的代码片段吧。

public class MapperMethod {

  // 这个就是我们加标识要获取的 command 对象了
  private final SqlCommand command;
  private final MethodSignature method;

  public MapperMethod(Class<?> mapperInterface, Method method, Configuration config) {
    this.command = new SqlCommand(config, mapperInterface, method);
    this.method = new MethodSignature(config, mapperInterface, method);
  }

  // 这个就是我们要捕获 execute 方法
  public Object execute(SqlSession sqlSession, Object[] args) {
    Object result;

    // command 的 type 代表的是数据库操作类型,对应增删改查,以及 flush 共5种类型
    switch (command.getType()) { 
      case INSERT: {
        Object param = method.convertArgsToSqlCommandParam(args);
        result = rowCountResult(sqlSession.insert(command.getName(), param));
        break;
      }
      case UPDATE: {
        Object param = method.convertArgsToSqlCommandParam(args);
        result = rowCountResult(sqlSession.update(command.getName(), param));
        break;
      }
      case DELETE: {
        Object param = method.convertArgsToSqlCommandParam(args);
        result = rowCountResult(sqlSession.delete(command.getName(), param));
        break;
      }
      case SELECT:
        if (method.returnsVoid() && method.hasResultHandler()) {
          executeWithResultHandler(sqlSession, args);
          result = null;
        } else if (method.returnsMany()) {
          result = executeForMany(sqlSession, args);
        } else if (method.returnsMap()) {
          result = executeForMap(sqlSession, args);
        } else if (method.returnsCursor()) {
          result = executeForCursor(sqlSession, args);
        } else {
          Object param = method.convertArgsToSqlCommandParam(args);
          result = sqlSession.selectOne(command.getName(), param);
        }
        break;
      case FLUSH:
        result = sqlSession.flushStatements();
        break;
      default:
        throw new BindingException("Unknown execution method for: " + command.getName());
    }
    if (result == null && method.getReturnType().isPrimitive() && !method.returnsVoid()) {
      throw new BindingException("Mapper method '" + command.getName() 
          + " attempted to return null from a method with a primitive return type (" + method.getReturnType() + ").");
    }
    return result;
  }
  ...

从代码中可以看出,这个 execute 起到了承上启下的左右,基本上所有数据库操作都会经过这里,而且也有足够的信息做单次调用的唯一的标识。

正如官方文档所说,只要找对了需要捕获的类,剩下的就顺利了。

但还有 1 个未解之谜:

1、为何 assembleRequest 要调整返回,改为只返回第二个入参?

官方同学给的答复:

execute 第一个参数是 SqlSession,不需要也不能序列化,对于录制和回放也没有意义。assembleRequest 本身也是作为 request 加工使用,有些参数是不一定需要使用的。

开始开发 rabbitmq 的插件

经过上面的解读,开发的方案就比较清晰了。

1、rabbitmq 主要会有 2 种被调用的情况。一种是生产者,作为子调用生成 mq 信息发到队列中。另一种是消费者,作为入口调用触发后续逻辑。在录制回放中,消费者的场景更为重要,需要优先满足。这种场景下,需要实现回放。
2、需要找到 rabbitmq 作为消费者的承上启下类和方法,并对应 RabbitMqPlugin 。
3、实现对应的 processor 类以及 repeater 类,实现回放。

更详细的,后续完成后再补充。


↙↙↙阅读原文可查看相关链接,并与作者交流