导读

从整个模块阅读下来,会发现 repeater-module 主要负责的是模块对外的交互、插件装配以及一些 spring 的类信息捕捉,录制回放实现细节主要依赖插件模块实现。

源码结构

elesgongdeMac-mini:jvm-sandbox-repeater elesg$ tree -L 12 repeater-module 
repeater-module
├── pom.xml
├── src
│   └── main
│       └── java
│           └── com
│               └── alibaba
│                   └── jvm
│                       └── sandbox
│                           └── repeater
│                               └── module
│                                   ├── Constants.java   //模块ID、模块版本常量
│                                   ├── ModuleJarUnLoadCompleted.java   // 实现sandbox的ModuleJarUnLoadSpi,模块Jar文件卸载完所有模块后,正式卸载Jar文件之前调用的操作(这里只做了日志工具销毁操作)
│                                   ├── RepeaterModule.java  //模块类,实现沙箱模块生命周期各个阶段逻辑实现
│                                   ├── advice
│                                   │   └── SpringInstantiateAdvice.java   //  spring初始化拦截器,agent启动模式下拦截记录beanName和bean,用作JavaRepeater回放
│                                   ├── classloader
│                                   │   └── PluginClassLoader.java // 插件类加载器;父类加载器是sandbox's module classLoader,使用了sandbox的Stealth标签,这个类加载器加载了的类不会被sandbox感知,所以不会捕捉他的调用事件
│                                   ├── impl
│                                   │   └── JarFileLifeCycleManager.java    // 插件加载器 ,主要用来加载录制回放插件,另外也支持通过jar包路径加载插件的方法。
│                                   └── util
│                                       ├── LogbackUtils.java  // 日志打印工具
│                                       └── SPILoader.java   //  加载spi ,通过ServiceLoader加载,看了调用方主要是JarFileLifeCycleManager,主要是用来加载repeater插件的的spi实现
└── target

核心类细读

RepeaterModule

其中比较核心的逻辑主要集中在RepeaterModule.java,接下来我们主要对 RepeaterModule 类进行详细的阅读。

package com.alibaba.jvm.sandbox.repeater.module;

/**
 * <p>
 *
 * @author zhaoyb1990
 */
@MetaInfServices(Module.class)
@Information(id = com.alibaba.jvm.sandbox.repeater.module.Constants.MODULE_ID, author = "zhaoyb1990", version = com.alibaba.jvm.sandbox.repeater.module.Constants.VERSION)
public class RepeaterModule implements Module, ModuleLifecycle {

    private final static Logger log = LoggerFactory.getLogger(RepeaterModule.class);

    @Resource
    private ModuleEventWatcher eventWatcher;   // 事件观察者

    @Resource
    private ModuleController moduleController;  // 模块控制接口,控制模块的激活和冻结

    @Resource
    private ConfigInfo configInfo;  // sandbox启动配置,这里只用来判断启动模式

    @Resource
    private LoadedClassDataSource loadedClassDataSource;  //已加载类数据源,可以获取到所有已加载类的集合

    private Broadcaster broadcaster;   //消息广播服务;用于采集流量之后的消息分发(保存录制记录,保存回放结果、拉取录制记录)

    private InvocationListener invocationListener;  // 调用监听器

    private ConfigManager configManager;   // 配置管理器,实现拉取配置

    private LifecycleManager lifecycleManager; // 插件加载器

    private List<InvokePlugin> invokePlugins;  //插件列表

    private AtomicBoolean initialized = new AtomicBoolean(false);  // 是否完成初始化

    @Override
    public void onLoad() throws Throwable {
        //模块加载,模块开始加载之前
        // 初始化日志框架
        LogbackUtils.init(PathUtils.getConfigPath() + "/repeater-logback.xml");
        // 获取启动模式
        Mode mode = configInfo.getMode();
        log.info("module on loaded,id={},version={},mode={}", com.alibaba.jvm.sandbox.repeater.module.Constants.MODULE_ID, com.alibaba.jvm.sandbox.repeater.module.Constants.VERSION, mode);
        /* agent方式启动 */
        if (mode == Mode.AGENT) {
            log.info("agent launch mode,use Spring Instantiate Advice to register bean.");
            //  SpringContext内部容器的是否agent模式设置为真
            SpringContextInnerContainer.setAgentLaunch(true);
            // spring初始化拦截器,agent启动模式下拦截记录beanNamebean
            SpringInstantiateAdvice.watcher(this.eventWatcher).watch();
            // 模块激活
            moduleController.active();
        }
    }

    @Override
    public void onUnload() throws Throwable {
        // 模块卸载,模块开始卸载之前调用
        // 释放插件加载资源,尽可能关闭pluginClassLoader
        if (lifecycleManager != null) {
            lifecycleManager.release();
        }
    }

    @Override
    public void onActive() throws Throwable {
        // 模块激活 就打印一个日志
        log.info("onActive");
    }

    @Override
    public void onFrozen() throws Throwable {
        // 模块冻结 就打印一个日志
        log.info("onFrozen");
    }

    @Override
    public void loadCompleted() {
        // 模块加载完成,模块完成加载后调用!
        // 这里不是很懂,因为这个虽然是一个多线程,但是只执行一次,run中内容还是顺序进行的。
        ExecutorInner.execute(new Runnable() {
            @Override
            public void run() {
                // 根据使用模式是单机版还是服务端板来获取拉取配置的实现方法
                configManager = StandaloneSwitch.instance().getConfigManager();
                // 根据使用模式是单机版还是服务端板来获取消息广播服务
                broadcaster = StandaloneSwitch.instance().getBroadcaster();
                // 调用监听器实例化
                invocationListener = new DefaultInvocationListener(broadcaster);
                // 拉取配置
                RepeaterResult<RepeaterConfig> pr = configManager.pullConfig();
                if (pr.isSuccess()) {
                    // 如果配置拉取成功
                    log.info("pull repeater config success,config={}", pr.getData());
                    // 根据已加载类数据源初始化类加载器连接桥
                    ClassloaderBridge.init(loadedClassDataSource);
                    // 根据配置进行插件初始化
                    initialize(pr.getData());
                }
            }
        });
    }

    /**
     * 初始化插件
     *
     * @param config 配置文件
     */
    private synchronized void initialize(RepeaterConfig config) {
        // 如果插件没有被初始化,才开始初始化
        if (initialized.compareAndSet(false, true)) {
            try {
                // http需要特殊路由操作,使用到容器里面的servlet-api
                PluginClassLoader.Routing[] routingArray = null;
                if (config.getPluginIdentities().contains(InvokeType.HTTP.name())) {
                    int retryTime = 60;
                    // Agent启动方式下类可能未加载完
                    // 所以这里不断重试60次进行查询查询
                    while (configInfo.getMode() == Mode.AGENT && --retryTime > 0
                            && ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME).size() == 0) {
                        try {
                            log.info("http plugin required servlet-api class router,waiting for class loading");
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            // ignore
                        }
                    }
                    // 重试结束后获取httpServlet的类实例列表
                    // 如列表中超过一个httpServlet或者没有httpServlet都会导致http插件使用失败
                    List<Class<?>> instances = ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME);
                    if (instances.size() > 1) {
                        throw new RuntimeException("found multiple servlet-api loaded in container, can't use http plugin");
                    }
                    // 如列表中只有一个httpServlet时将这个httpServlet加到插件加载器的路由中
                    if (instances.size() == 1){
                        Class<?> aClass = instances.get(0);
                        routingArray = new PluginClassLoader.Routing[]{new PluginClassLoader.Routing(aClass.getClassLoader(), "^javax.servlet..*")};
                    } else {
                        config.getPluginIdentities().remove(InvokeType.HTTP.name());
                        log.info("http plugin required servlet-api class router, but found no valid class in classloader, ignore http plugin");
                    }
                }
                // 读取插件路径,如无则驱默认
                String pluginsPath;
                if (StringUtils.isEmpty(config.getPluginsPath())) {
                    pluginsPath = PathUtils.getPluginPath();
                } else {
                    pluginsPath = config.getPluginsPath();
                }
                lifecycleManager = new JarFileLifeCycleManager(pluginsPath, routingArray);
                // 装载插件
                invokePlugins = lifecycleManager.loadInvokePlugins();
                // 在全局的应用模型中插入当前配置
                ApplicationModel.instance().setConfig(config);
                // 加载好插件知乎,遍历列表运行插件初始化
                for (InvokePlugin invokePlugin : invokePlugins) {
                    try {
                        // 设置插件配置
                        if (invokePlugin.enable(config)) {
                            log.info("enable plugin {} success", invokePlugin.identity());
                            // 配置插件观察时事件
                            invokePlugin.watch(eventWatcher, invocationListener);
                        }
                    } catch (PluginLifeCycleException e) {
                        log.info("watch plugin occurred error", e);
                    }
                }
                // 装载回放器
                List<Repeater> repeaters = lifecycleManager.loadRepeaters();
                for (Repeater repeater : repeaters) {
                    if (repeater.enable(config)) {
                        repeater.setBroadcast(broadcaster);
                    }
                }
                RepeaterBridge.instance().build(repeaters);
                // 装载消息订阅器
                List<SubscribeSupporter> subscribes = lifecycleManager.loadSubscribes();
                for (SubscribeSupporter subscribe : subscribes) {
                    subscribe.register();
                }
                TtlConcurrentAdvice.watcher(eventWatcher).watch(config);
            } catch (Throwable throwable) {
                initialized.compareAndSet(true, false);
                log.error("error occurred when initialize module", throwable);
            }
        }
    }

    /**
     * 回放http接口
     * 接口路径/sandbox/default/module/http/repeater/repeat
     *
     * @param req    请求参数
     * @param writer printWriter
     */
    @Command("repeat")
    public void repeat(final Map<String, String> req, final PrintWriter writer) {
        try {
            // 判断是否有"_data"参数,如果没有则返回报错
            String data = req.get(Constants.DATA_TRANSPORT_IDENTIFY);
            if (StringUtils.isEmpty(data)) {
                writer.write("invalid request, cause parameter {" + Constants.DATA_TRANSPORT_IDENTIFY + "} is required");
                return;
            }
            // 将回放请求的参数保存到RepeatEvent对象,并将这个对象推送到回放事件总线
            RepeatEvent event = new RepeatEvent();
            Map<String, String> requestParams = new HashMap<String, String>(16);
            for (Map.Entry<String, String> entry : req.entrySet()) {
                requestParams.put(entry.getKey(), entry.getValue());
            }
            event.setRequestParams(requestParams);
            EventBusInner.post(event);
            writer.write("submit success");
        } catch (Throwable e) {
            writer.write(e.getMessage());
        }
    }

    /**
     * 配置推送接口
     * 接口路径/sandbox/default/module/http/repeater/pushConfig
     *
     * @param req    请求参数
     * @param writer printWriter
     */
    @Command("pushConfig")
    public void pushConfig(final Map<String, String> req, final PrintWriter writer) {
        // 判断是否有"_data"参数,如果没有则返回报错
        String data = req.get(Constants.DATA_TRANSPORT_IDENTIFY);
        if (StringUtils.isEmpty(data)) {
            writer.write("invalid request, cause parameter {" + Constants.DATA_TRANSPORT_IDENTIFY + "} is required");
            return;
        }
        try {
            // 将请求参数序列化之后获取RepeaterConfig,并且通知插件更新配置
            RepeaterConfig config = SerializerWrapper.hessianDeserialize(data, RepeaterConfig.class);
            noticeConfigChange(config);
            writer.write("config push success");
        } catch (SerializeException e) {
            writer.write("invalid request, cause deserialize config failed, reason = {" + e.getMessage() + "}");
        }
    }

    /**
     * 通知插件配置变更
     *
     * @param config 配置文件
     */
    private void noticeConfigChange(final RepeaterConfig config) {
        // 如果模块初始化已成功,逐个插件进行更新通知
        if (initialized.get()) {
            for (InvokePlugin invokePlugin : invokePlugins) {
                try {
                    invokePlugin.onConfigChange(config);
                } catch (PluginLifeCycleException e) {
                    log.error("error occurred when notice config, plugin ={}", invokePlugin.getType().name(), e);
                }
            }
        }
    }
}

疑问点

if (config.getPluginIdentities().contains(InvokeType.HTTP.name())) {
                   int retryTime = 60;
                   // Agent启动方式下类可能未加载完
                   // 所以这里不断重试60次进行查询查询
                   while (configInfo.getMode() == Mode.AGENT && --retryTime > 0
                           && ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME).size() == 0) {
                       try {
                           log.info("http plugin required servlet-api class router,waiting for class loading");
                           Thread.sleep(1000);
                       } catch (InterruptedException e) {
                           // ignore
                       }
                   }

之前有问过大佬,大佬给的答复是在执行loadCompleted方法时是用多线程跑的,所以这里是ClassloaderBridge.init()initialize这两个调用是可以同步进行的,我们看回这段代码其实会发现他这里用了一个ExecutorInner来执行。

public void loadCompleted() {
       // 模块加载完成,模块完成加载后调用!
       // 这里不是很懂,因为这个虽然是一个多线程,但是只执行一次,run中内容还是顺序进行的。
       ExecutorInner.execute(new Runnable() {
           @Override
           public void run() {
               configManager = StandaloneSwitch.instance().getConfigManager();
               broadcaster = StandaloneSwitch.instance().getBroadcaster();
               invocationListener = new DefaultInvocationListener(broadcaster);
               RepeaterResult<RepeaterConfig> pr = configManager.pullConfig();
               if (pr.isSuccess()) {
                   log.info("pull repeater config success,config={}", pr.getData());
                   // 根据已加载类数据源初始化类加载器连接桥
                   ClassloaderBridge.init(loadedClassDataSource);
                   // 根据配置进行插件初始化
                   initialize(pr.getData());
               }
           }
       });
   }

而这个ExecutorInner其实是本质上是一个ExecutorService。我尝试写了一个,传入的 Runnable 里面是 4 个 log 打印,分别打印个字的线程名称,发现打印出来的线程名都是同一个,那么这个到底是什么样的多线程执行呢?

public class ExecutorInner {

    private static ExecutorService executor = new ThreadPoolExecutor(8,
        4 * Runtime.getRuntime().availableProcessors(),
        30L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(4096),
        new BasicThreadFactory.Builder().namingPattern("repeater-common-pool-%d").build(),
        new ThreadPoolExecutor.CallerRunsPolicy());

    public static void execute(Runnable r) {
        executor.execute(r);
    }

    public static Executor getExecutor() {
        return executor;
    }

    public static <T> Future<T> submit(Callable<T> callable) {
        return executor.submit(callable);
    }

    @Test
    public void  test(){
    executor.execute(new Runnable() {
            @Override
            public void run() {
                log.info("第一行");
                log.info("第二行");
                log.info("第三行");
                log.info("第四行");
                log.info("第五行");
            }
        });
    }
}
// 输出结果
[2019-09-03 00:00:21,308] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第一行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第二行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第三行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第四行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第五行


↙↙↙阅读原文可查看相关链接,并与作者交流