从整个模块阅读下来,会发现 repeater-module 主要负责的是模块对外的交互、插件装配以及一些 spring 的类信息捕捉,录制回放实现细节主要依赖插件模块实现。
elesgongdeMac-mini:jvm-sandbox-repeater elesg$ tree -L 12 repeater-module
repeater-module
├── pom.xml
├── src
│ └── main
│ └── java
│ └── com
│ └── alibaba
│ └── jvm
│ └── sandbox
│ └── repeater
│ └── module
│ ├── Constants.java //模块ID、模块版本常量
│ ├── ModuleJarUnLoadCompleted.java // 实现sandbox的ModuleJarUnLoadSpi,模块Jar文件卸载完所有模块后,正式卸载Jar文件之前调用的操作(这里只做了日志工具销毁操作)
│ ├── RepeaterModule.java //模块类,实现沙箱模块生命周期各个阶段逻辑实现
│ ├── advice
│ │ └── SpringInstantiateAdvice.java // spring初始化拦截器,agent启动模式下拦截记录beanName和bean,用作JavaRepeater回放
│ ├── classloader
│ │ └── PluginClassLoader.java // 插件类加载器;父类加载器是sandbox's module classLoader,使用了sandbox的Stealth标签,这个类加载器加载了的类不会被sandbox感知,所以不会捕捉他的调用事件
│ ├── impl
│ │ └── JarFileLifeCycleManager.java // 插件加载器 ,主要用来加载录制回放插件,另外也支持通过jar包路径加载插件的方法。
│ └── util
│ ├── LogbackUtils.java // 日志打印工具
│ └── SPILoader.java // 加载spi ,通过ServiceLoader加载,看了调用方主要是JarFileLifeCycleManager,主要是用来加载repeater插件的的spi实现
└── target
其中比较核心的逻辑主要集中在RepeaterModule.java
,接下来我们主要对 RepeaterModule 类进行详细的阅读。
package com.alibaba.jvm.sandbox.repeater.module;
/**
* <p>
*
* @author zhaoyb1990
*/
@MetaInfServices(Module.class)
@Information(id = com.alibaba.jvm.sandbox.repeater.module.Constants.MODULE_ID, author = "zhaoyb1990", version = com.alibaba.jvm.sandbox.repeater.module.Constants.VERSION)
public class RepeaterModule implements Module, ModuleLifecycle {
private final static Logger log = LoggerFactory.getLogger(RepeaterModule.class);
@Resource
private ModuleEventWatcher eventWatcher; // 事件观察者
@Resource
private ModuleController moduleController; // 模块控制接口,控制模块的激活和冻结
@Resource
private ConfigInfo configInfo; // sandbox启动配置,这里只用来判断启动模式
@Resource
private LoadedClassDataSource loadedClassDataSource; //已加载类数据源,可以获取到所有已加载类的集合
private Broadcaster broadcaster; //消息广播服务;用于采集流量之后的消息分发(保存录制记录,保存回放结果、拉取录制记录)
private InvocationListener invocationListener; // 调用监听器
private ConfigManager configManager; // 配置管理器,实现拉取配置
private LifecycleManager lifecycleManager; // 插件加载器
private List<InvokePlugin> invokePlugins; //插件列表
private AtomicBoolean initialized = new AtomicBoolean(false); // 是否完成初始化
@Override
public void onLoad() throws Throwable {
//模块加载,模块开始加载之前
// 初始化日志框架
LogbackUtils.init(PathUtils.getConfigPath() + "/repeater-logback.xml");
// 获取启动模式
Mode mode = configInfo.getMode();
log.info("module on loaded,id={},version={},mode={}", com.alibaba.jvm.sandbox.repeater.module.Constants.MODULE_ID, com.alibaba.jvm.sandbox.repeater.module.Constants.VERSION, mode);
/* agent方式启动 */
if (mode == Mode.AGENT) {
log.info("agent launch mode,use Spring Instantiate Advice to register bean.");
// SpringContext内部容器的是否agent模式设置为真
SpringContextInnerContainer.setAgentLaunch(true);
// spring初始化拦截器,agent启动模式下拦截记录beanName和bean
SpringInstantiateAdvice.watcher(this.eventWatcher).watch();
// 模块激活
moduleController.active();
}
}
@Override
public void onUnload() throws Throwable {
// 模块卸载,模块开始卸载之前调用
// 释放插件加载资源,尽可能关闭pluginClassLoader
if (lifecycleManager != null) {
lifecycleManager.release();
}
}
@Override
public void onActive() throws Throwable {
// 模块激活 就打印一个日志
log.info("onActive");
}
@Override
public void onFrozen() throws Throwable {
// 模块冻结 就打印一个日志
log.info("onFrozen");
}
@Override
public void loadCompleted() {
// 模块加载完成,模块完成加载后调用!
// 这里不是很懂,因为这个虽然是一个多线程,但是只执行一次,run中内容还是顺序进行的。
ExecutorInner.execute(new Runnable() {
@Override
public void run() {
// 根据使用模式是单机版还是服务端板来获取拉取配置的实现方法
configManager = StandaloneSwitch.instance().getConfigManager();
// 根据使用模式是单机版还是服务端板来获取消息广播服务
broadcaster = StandaloneSwitch.instance().getBroadcaster();
// 调用监听器实例化
invocationListener = new DefaultInvocationListener(broadcaster);
// 拉取配置
RepeaterResult<RepeaterConfig> pr = configManager.pullConfig();
if (pr.isSuccess()) {
// 如果配置拉取成功
log.info("pull repeater config success,config={}", pr.getData());
// 根据已加载类数据源初始化类加载器连接桥
ClassloaderBridge.init(loadedClassDataSource);
// 根据配置进行插件初始化
initialize(pr.getData());
}
}
});
}
/**
* 初始化插件
*
* @param config 配置文件
*/
private synchronized void initialize(RepeaterConfig config) {
// 如果插件没有被初始化,才开始初始化
if (initialized.compareAndSet(false, true)) {
try {
// http需要特殊路由操作,使用到容器里面的servlet-api
PluginClassLoader.Routing[] routingArray = null;
if (config.getPluginIdentities().contains(InvokeType.HTTP.name())) {
int retryTime = 60;
// Agent启动方式下类可能未加载完
// 所以这里不断重试60次进行查询查询
while (configInfo.getMode() == Mode.AGENT && --retryTime > 0
&& ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME).size() == 0) {
try {
log.info("http plugin required servlet-api class router,waiting for class loading");
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
}
// 重试结束后获取httpServlet的类实例列表
// 如列表中超过一个httpServlet或者没有httpServlet都会导致http插件使用失败
List<Class<?>> instances = ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME);
if (instances.size() > 1) {
throw new RuntimeException("found multiple servlet-api loaded in container, can't use http plugin");
}
// 如列表中只有一个httpServlet时将这个httpServlet加到插件加载器的路由中
if (instances.size() == 1){
Class<?> aClass = instances.get(0);
routingArray = new PluginClassLoader.Routing[]{new PluginClassLoader.Routing(aClass.getClassLoader(), "^javax.servlet..*")};
} else {
config.getPluginIdentities().remove(InvokeType.HTTP.name());
log.info("http plugin required servlet-api class router, but found no valid class in classloader, ignore http plugin");
}
}
// 读取插件路径,如无则驱默认
String pluginsPath;
if (StringUtils.isEmpty(config.getPluginsPath())) {
pluginsPath = PathUtils.getPluginPath();
} else {
pluginsPath = config.getPluginsPath();
}
lifecycleManager = new JarFileLifeCycleManager(pluginsPath, routingArray);
// 装载插件
invokePlugins = lifecycleManager.loadInvokePlugins();
// 在全局的应用模型中插入当前配置
ApplicationModel.instance().setConfig(config);
// 加载好插件知乎,遍历列表运行插件初始化
for (InvokePlugin invokePlugin : invokePlugins) {
try {
// 设置插件配置
if (invokePlugin.enable(config)) {
log.info("enable plugin {} success", invokePlugin.identity());
// 配置插件观察时事件
invokePlugin.watch(eventWatcher, invocationListener);
}
} catch (PluginLifeCycleException e) {
log.info("watch plugin occurred error", e);
}
}
// 装载回放器
List<Repeater> repeaters = lifecycleManager.loadRepeaters();
for (Repeater repeater : repeaters) {
if (repeater.enable(config)) {
repeater.setBroadcast(broadcaster);
}
}
RepeaterBridge.instance().build(repeaters);
// 装载消息订阅器
List<SubscribeSupporter> subscribes = lifecycleManager.loadSubscribes();
for (SubscribeSupporter subscribe : subscribes) {
subscribe.register();
}
TtlConcurrentAdvice.watcher(eventWatcher).watch(config);
} catch (Throwable throwable) {
initialized.compareAndSet(true, false);
log.error("error occurred when initialize module", throwable);
}
}
}
/**
* 回放http接口
* 接口路径/sandbox/default/module/http/repeater/repeat
*
* @param req 请求参数
* @param writer printWriter
*/
@Command("repeat")
public void repeat(final Map<String, String> req, final PrintWriter writer) {
try {
// 判断是否有"_data"参数,如果没有则返回报错
String data = req.get(Constants.DATA_TRANSPORT_IDENTIFY);
if (StringUtils.isEmpty(data)) {
writer.write("invalid request, cause parameter {" + Constants.DATA_TRANSPORT_IDENTIFY + "} is required");
return;
}
// 将回放请求的参数保存到RepeatEvent对象,并将这个对象推送到回放事件总线
RepeatEvent event = new RepeatEvent();
Map<String, String> requestParams = new HashMap<String, String>(16);
for (Map.Entry<String, String> entry : req.entrySet()) {
requestParams.put(entry.getKey(), entry.getValue());
}
event.setRequestParams(requestParams);
EventBusInner.post(event);
writer.write("submit success");
} catch (Throwable e) {
writer.write(e.getMessage());
}
}
/**
* 配置推送接口
* 接口路径/sandbox/default/module/http/repeater/pushConfig
*
* @param req 请求参数
* @param writer printWriter
*/
@Command("pushConfig")
public void pushConfig(final Map<String, String> req, final PrintWriter writer) {
// 判断是否有"_data"参数,如果没有则返回报错
String data = req.get(Constants.DATA_TRANSPORT_IDENTIFY);
if (StringUtils.isEmpty(data)) {
writer.write("invalid request, cause parameter {" + Constants.DATA_TRANSPORT_IDENTIFY + "} is required");
return;
}
try {
// 将请求参数序列化之后获取RepeaterConfig,并且通知插件更新配置
RepeaterConfig config = SerializerWrapper.hessianDeserialize(data, RepeaterConfig.class);
noticeConfigChange(config);
writer.write("config push success");
} catch (SerializeException e) {
writer.write("invalid request, cause deserialize config failed, reason = {" + e.getMessage() + "}");
}
}
/**
* 通知插件配置变更
*
* @param config 配置文件
*/
private void noticeConfigChange(final RepeaterConfig config) {
// 如果模块初始化已成功,逐个插件进行更新通知
if (initialized.get()) {
for (InvokePlugin invokePlugin : invokePlugins) {
try {
invokePlugin.onConfigChange(config);
} catch (PluginLifeCycleException e) {
log.error("error occurred when notice config, plugin ={}", invokePlugin.getType().name(), e);
}
}
}
}
}
initialize
方法中,有一个在判断等待,是我觉得不合理的,因为在调用到这个方法之前,已经执行了ClassloaderBridge.init()
的方法,但是他却在这个方法里面说类可能没加载完,所以有了这个等待 60 秒的判断。if (config.getPluginIdentities().contains(InvokeType.HTTP.name())) {
int retryTime = 60;
// Agent启动方式下类可能未加载完
// 所以这里不断重试60次进行查询查询
while (configInfo.getMode() == Mode.AGENT && --retryTime > 0
&& ClassloaderBridge.instance().findClassInstances(Constants.SERVLET_API_NAME).size() == 0) {
try {
log.info("http plugin required servlet-api class router,waiting for class loading");
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
}
之前有问过大佬,大佬给的答复是在执行loadCompleted
方法时是用多线程跑的,所以这里是ClassloaderBridge.init()
和initialize
这两个调用是可以同步进行的,我们看回这段代码其实会发现他这里用了一个ExecutorInner
来执行。
public void loadCompleted() {
// 模块加载完成,模块完成加载后调用!
// 这里不是很懂,因为这个虽然是一个多线程,但是只执行一次,run中内容还是顺序进行的。
ExecutorInner.execute(new Runnable() {
@Override
public void run() {
configManager = StandaloneSwitch.instance().getConfigManager();
broadcaster = StandaloneSwitch.instance().getBroadcaster();
invocationListener = new DefaultInvocationListener(broadcaster);
RepeaterResult<RepeaterConfig> pr = configManager.pullConfig();
if (pr.isSuccess()) {
log.info("pull repeater config success,config={}", pr.getData());
// 根据已加载类数据源初始化类加载器连接桥
ClassloaderBridge.init(loadedClassDataSource);
// 根据配置进行插件初始化
initialize(pr.getData());
}
}
});
}
而这个ExecutorInner
其实是本质上是一个ExecutorService
。我尝试写了一个,传入的 Runnable 里面是 4 个 log 打印,分别打印个字的线程名称,发现打印出来的线程名都是同一个,那么这个到底是什么样的多线程执行呢?
public class ExecutorInner {
private static ExecutorService executor = new ThreadPoolExecutor(8,
4 * Runtime.getRuntime().availableProcessors(),
30L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(4096),
new BasicThreadFactory.Builder().namingPattern("repeater-common-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
public static void execute(Runnable r) {
executor.execute(r);
}
public static Executor getExecutor() {
return executor;
}
public static <T> Future<T> submit(Callable<T> callable) {
return executor.submit(callable);
}
@Test
public void test(){
executor.execute(new Runnable() {
@Override
public void run() {
log.info("第一行");
log.info("第二行");
log.info("第三行");
log.info("第四行");
log.info("第五行");
}
});
}
}
// 输出结果
[2019-09-03 00:00:21,308] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第一行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第二行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第三行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第四行
[2019-09-03 00:00:21,319] [INFO] com.alibaba.jvm.sandbox.repeater.plugin.core.util.ExecutorInner [repeater-common-pool-1] [] hawkeye alert-server - 第五行