开源测试工具 jvm-sandbox-repeater 源码阅读笔记--repeater-module 篇

ELes for PPmoney · August 31, 2019 · 579 hits

导读

从整个模块阅读下来,会发现repeater-module主要负责的是模块对外的交互、插件装配以及一些spring的类信息捕捉,录制回放实现细节主要依赖插件模块实现。

源码结构

elesgongdeMac-mini:jvm-sandbox-repeater elesg$ tree -L 12 repeater-module 
repeater-module
├── pom.xml
├── src
│   └── main
│   └── java
│   └── com
│   └── alibaba
│   └── jvm
│   └── sandbox
│   └── repeater
│   └── module
│   ├── Constants.java //模块ID、模块版本常量
│   ├── ModuleJarUnLoadCompleted.java // 实现sandbox的ModuleJarUnLoadSpi,模块Jar文件卸载完所有模块后,正式卸载Jar文件之前调用的操作(这里只做了日志工具销毁操作)
│   ├── RepeaterModule.java //模块类,实现沙箱模块生命周期各个阶段逻辑实现
│   ├── advice
│   │   └── SpringInstantiateAdvice.java // spring初始化拦截器,agent启动模式下拦截记录beanName和bean,用作JavaRepeater回放
│   ├── classloader
│   │   └── PluginClassLoader.java // 插件类加载器;父类加载器是sandbox's module classLoader,使用了sandbox的Stealth标签,这个类加载器加载了的类不会被sandbox感知,所以不会捕捉他的调用事件
│   ├── impl
│   │   └── JarFileLifeCycleManager.java // 插件加载器 ,主要用来加载录制回放插件,另外也支持通过jar包路径加载插件的方法。
│   └── util
│   ├── LogbackUtils.java // 日志打印工具
│   └── SPILoader.java // 加载spi ,通过ServiceLoader加载,看了调用方主要是JarFileLifeCycleManager,主要是用来加载repeater插件的的spi实现
└── target

核心类细读

RepeaterModule

其中比较核心的逻辑主要集中在RepeaterModule.java,接下来我们主要对RepeaterModule类进行详细的阅读。

package com.alibaba.jvm.sandbox.repeater.module;

/**
* <p>
*
* @author zhaoyb1990
*/

@MetaInfServices(Module.class)
@Information(id = com.alibaba.jvm.sandbox.repeater.module.Constants.MODULE_ID, author = "zhaoyb1990", version = com.alibaba.jvm.sandbox.repeater.module.Constants.VERSION)
public class RepeaterModule implements Module, ModuleLifecycle {

private final static Logger log = LoggerFactory.getLogger(RepeaterModule.class);

@Resource
private ModuleEventWatcher eventWatcher; // 事件观察者

@Resource
private ModuleController moduleController; // 模块控制接口,控制模块的激活和冻结

@Resource
private ConfigInfo configInfo; // sandbox启动配置,这里只用来判断启动模式

@Resource
private LoadedClassDataSource loadedClassDataSource; //已加载类数据源,可以获取到所有已加载类的集合

private Broadcaster broadcaster; //消息广播服务;用于采集流量之后的消息分发(保存录制记录,保存回放结果、拉取录制记录)

private InvocationListener invocationListener; // 调用监听器

private ConfigManager configManager; // 配置管理器,实现拉取配置

private LifecycleManager lifecycleManager; // 插件加载器

private List<InvokePlugin> invokePlugins; //插件列表

private AtomicBoolean initialized = new AtomicBoolean(false); // 是否完成初始化

@Override
public void onLoad() throws Throwable {
//模块加载,模块开始加载之前
// 初始化日志框架
LogbackUtils.init(PathUtils.getConfigPath() + "/repeater-logback.xml");
// 获取启动模式
Mode mode = configInfo.getMode();
log.info("module on loaded,id={},version={},mode={}", com.alibaba.jvm.sandbox.repeater.module.Constants.MODULE_ID, com.alibaba.jvm.sandbox.repeater.module.Constants.VERSION, mode);
/* agent方式启动 */
if (mode == Mode.AGENT) {
log.info("agent launch mode,use Spring Instantiate Advice to register bean.");
// SpringContext内部容器的是否agent模式设置为真
SpringContextInnerContainer.setAgentLaunch(true);
// spring初始化拦截器,agent启动模式下拦截记录beanName和bean
SpringInstantiateAdvice.watcher(this.eventWatcher).watch();
// 模块激活
moduleController.active();
}
}

@Override
public void onUnload() throws Throwable {
// 模块卸载,模块开始卸载之前调用
// 释放插件加载资源,尽可能关闭pluginClassLoader
if (lifecycleManager != null) {
lifecycleManager.release();
}
}

@Override
public void onActive() throws Throwable {
// 模块激活 就打印一个日志
log.info("onActive");
}

@Override
public void onFrozen() throws Throwable {
// 模块冻结 就打印一个日志
log.info("onFrozen");
}

@Override
public void loadCompleted() {
// 模块加载完成,模块完成加载后调用!
// 这里不是很懂,因为这个虽然是一个多线程,但是只执行一次,run中内容还是顺序进行的。
ExecutorInner.execute(new Runnable() {
@Override
public void run() {
// 根据使用模式是单机版还是服务端板来获取拉取配置的实现方法
configManager = StandaloneSwitch.instance().getConfigManager();
// 根据使用模式是单机版还是服务端板来获取消息广播服务
broadcaster = StandaloneSwitch.instance().getBroadcaster();
// 调用监听器实例化
invocationListener = new DefaultInvocationListener(broadcaster);
// 拉取配置
RepeaterResult<RepeaterConfig> pr = configManager.pullConfig();
if (pr.isSuccess()) {
// 如果配置拉取成功
log.info("pull repeater config success,config={}", pr.getData());
// 根据已加载类数据源初始化类加载器连接桥
ClassloaderBridge.init(loadedClassDataSource);
// 根据配置进行插件初始化
initialize(pr.getData());
}
}
});
}

/**
* 初始化插件
*
* @param config 配置文件
*/

private synchronized void initialize(RepeaterConfig config) {
// 如果插件没有被初始化,才开始初始化
if (initialized.compareAndSet(false, true)) {
try {
// http需要特殊路由操作,使用到容器里面的servlet-api
PluginClassLoader.Routing[] routingArray = null;
if (config.getPluginIdentities().contains(InvokeType.HTTP.name())) {
int retryTime = 60;
// Agent启动方式下类可能未加载完
// 所以这里不断重试60次进行查询查询
while (configInfo.getMode() == Mode.AGENT && --retryTime > 0
&& ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME).size() == 0) {
try {
log.info("http plugin required servlet-api class router,waiting for class loading");
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
}
// 重试结束后获取httpServlet的类实例列表
// 如列表中超过一个httpServlet或者没有httpServlet都会导致http插件使用失败
List<Class<?>> instances = ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME);
if (instances.size() > 1) {
throw new RuntimeException("found multiple servlet-api loaded in container, can't use http plugin");
}
// 如列表中只有一个httpServlet时将这个httpServlet加到插件加载器的路由中
if (instances.size() == 1){
Class<?> aClass = instances.get(0);
routingArray = new PluginClassLoader.Routing[]{new PluginClassLoader.Routing(aClass.getClassLoader(), "^javax.servlet..*")};
} else {
config.getPluginIdentities().remove(InvokeType.HTTP.name());
log.info("http plugin required servlet-api class router, but found no valid class in classloader, ignore http plugin");
}
}
// 读取插件路径,如无则驱默认
String pluginsPath;
if (StringUtils.isEmpty(config.getPluginsPath())) {
pluginsPath = PathUtils.getPluginPath();
} else {
pluginsPath = config.getPluginsPath();
}
lifecycleManager = new JarFileLifeCycleManager(pluginsPath, routingArray);
// 装载插件
invokePlugins = lifecycleManager.loadInvokePlugins();
// 在全局的应用模型中插入当前配置
ApplicationModel.instance().setConfig(config);
// 加载好插件知乎,遍历列表运行插件初始化
for (InvokePlugin invokePlugin : invokePlugins) {
try {
// 设置插件配置
if (invokePlugin.enable(config)) {
log.info("enable plugin {} success", invokePlugin.identity());
// 配置插件观察时事件
invokePlugin.watch(eventWatcher, invocationListener);
}
} catch (PluginLifeCycleException e) {
log.info("watch plugin occurred error", e);
}
}
// 装载回放器
List<Repeater> repeaters = lifecycleManager.loadRepeaters();
for (Repeater repeater : repeaters) {
if (repeater.enable(config)) {
repeater.setBroadcast(broadcaster);
}
}
RepeaterBridge.instance().build(repeaters);
// 装载消息订阅器
List<SubscribeSupporter> subscribes = lifecycleManager.loadSubscribes();
for (SubscribeSupporter subscribe : subscribes) {
subscribe.register();
}
TtlConcurrentAdvice.watcher(eventWatcher).watch(config);
} catch (Throwable throwable) {
initialized.compareAndSet(true, false);
log.error("error occurred when initialize module", throwable);
}
}
}

/**
* 回放http接口
* 接口路径/sandbox/default/module/http/repeater/repeat
*
* @param req 请求参数
* @param writer printWriter
*/

@Command("repeat")
public void repeat(final Map<String, String> req, final PrintWriter writer) {
try {
// 判断是否有"_data"参数,如果没有则返回报错
String data = req.get(Constants.DATA_TRANSPORT_IDENTIFY);
if (StringUtils.isEmpty(data)) {
writer.write("invalid request, cause parameter {" + Constants.DATA_TRANSPORT_IDENTIFY + "} is required");
return;
}
// 将回放请求的参数保存到RepeatEvent对象,并将这个对象推送到回放事件总线
RepeatEvent event = new RepeatEvent();
Map<String, String> requestParams = new HashMap<String, String>(16);
for (Map.Entry<String, String> entry : req.entrySet()) {
requestParams.put(entry.getKey(), entry.getValue());
}
event.setRequestParams(requestParams);
EventBusInner.post(event);
writer.write("submit success");
} catch (Throwable e) {
writer.write(e.getMessage());
}
}

/**
* 配置推送接口
* 接口路径/sandbox/default/module/http/repeater/pushConfig
*
* @param req 请求参数
* @param writer printWriter
*/

@Command("pushConfig")
public void pushConfig(final Map<String, String> req, final PrintWriter writer) {
// 判断是否有"_data"参数,如果没有则返回报错
String data = req.get(Constants.DATA_TRANSPORT_IDENTIFY);
if (StringUtils.isEmpty(data)) {
writer.write("invalid request, cause parameter {" + Constants.DATA_TRANSPORT_IDENTIFY + "} is required");
return;
}
try {
// 将请求参数序列化之后获取RepeaterConfig,并且通知插件更新配置
RepeaterConfig config = SerializerWrapper.hessianDeserialize(data, RepeaterConfig.class);
noticeConfigChange(config);
writer.write("config push success");
} catch (SerializeException e) {
writer.write("invalid request, cause deserialize config failed, reason = {" + e.getMessage() + "}");
}
}

/**
* 通知插件配置变更
*
* @param config 配置文件
*/

private void noticeConfigChange(final RepeaterConfig config) {
// 如果模块初始化已成功,逐个插件进行更新通知
if (initialized.get()) {
for (InvokePlugin invokePlugin : invokePlugins) {
try {
invokePlugin.onConfigChange(config);
} catch (PluginLifeCycleException e) {
log.error("error occurred when notice config, plugin ={}", invokePlugin.getType().name(), e);
}
}
}
}
}

疑问点

  • initialize方法中,有一个在判断等待,是我觉得不合理的,因为在调用到这个方法之前,已经执行了ClassloaderBridge.init()的方法,但是他却在这个方法里面说类可能没加载完,所以有了这个等待60秒的判断。
if (config.getPluginIdentities().contains(InvokeType.HTTP.name())) {
int retryTime = 60;
// Agent启动方式下类可能未加载完
// 所以这里不断重试60次进行查询查询
while (configInfo.getMode() == Mode.AGENT && --retryTime > 0
&& ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME).size() == 0) {
try {
log.info("http plugin required servlet-api class router,waiting for class loading");
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
}

之前有问过大佬,大佬给的答复是在执行loadCompleted方法时是用多线程跑的,所以这里是ClassloaderBridge.init()initialize这两个调用是可以同步进行的,我们看回这段代码其实会发现他这里用了一个ExecutorInner来执行。

public void loadCompleted() {
// 模块加载完成,模块完成加载后调用!
// 这里不是很懂,因为这个虽然是一个多线程,但是只执行一次,run中内容还是顺序进行的。
ExecutorInner.execute(new Runnable() {
@Override
public void run() {
configManager = StandaloneSwitch.instance().getConfigManager();
broadcaster = StandaloneSwitch.instance().getBroadcaster();
invocationListener = new DefaultInvocationListener(broadcaster);
RepeaterResult<RepeaterConfig> pr = configManager.pullConfig();
if (pr.isSuccess()) {
log.info("pull repeater config success,config={}", pr.getData());
// 根据已加载类数据源初始化类加载器连接桥
ClassloaderBridge.init(loadedClassDataSource);
// 根据配置进行插件初始化
initialize(pr.getData());
}
}
});
}

而这个ExecutorInner其实是本质上是一个ExecutorService。我尝试写了一个,传入的Runnable里面是4个log打印,分别打印个字的线程名称,发现打印出来的线程名都是同一个,那么这个到底是什么样的多线程执行呢?

public class ExecutorInner {

private static ExecutorService executor = new ThreadPoolExecutor(8,
4 * Runtime.getRuntime().availableProcessors(),
30L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(4096),
new BasicThreadFactory.Builder().namingPattern("repeater-common-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy());

public static void execute(Runnable r) {
executor.execute(r);
}

public static Executor getExecutor() {
return executor;
}

public static <T> Future<T> submit(Callable<T> callable) {
return executor.submit(callable);
}

@Test
public void test(){
executor.execute(new Runnable() {
@Override
public void run() {
log.info("第一行");
log.info("第二行");
log.info("第三行");
log.info("第四行");
log.info("第五行");
}
});
}
}
// 输出结果
[2019-09-03 00:00:21,308] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第一行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第二行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第三行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第四行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第五行
共收到 0 条回复 时间 点赞
需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up