AREX 是一款开源的自动化测试工具,通过 Java Agent 字节码注入技术,在生产环境录制和存储请求、应答数据,随后在测试环境回放请求和注入 Mock 数据,存储新的应答,以此来达到自动录制、自动回放、自动比对,为接口回归测试提供便利。在进行数据采集时,同一个请求,会采集下来多条数据(如 Request/Response、其它服务调用的请求响应等),AREX 通过链路跟踪将这些数据串联起来,并做为一个完整的测试用例。本文将深入解读 AREX Agent 中关于全链路跟踪和 Mock 数据读写的源码。
AREX 的链路跟踪类似于 OpenTelemetry,下面就先简单介绍一下 OpenTelemetry 中如何实现全链路跟踪。
OpenTelemetry 是一种用于分布式系统的开源观测性工具,其全链路跟踪的实现依赖于上下文(Context)传播机制,
数据传播按照场景分为进程内传播和分布式传播两类。在进程内传播中,上下文对象在一个服务内部传递 Trace,相对比较简单。而在分布式传播中,Context propagation 在不同的服务之间传递上下文信息。
在 OpenTelemetry 中,context 是一个包含键值对的数据结构,例如线程或循环程序,用于在请求处理过程中传递数据。在每种编程语言中,OpenTelemetry 都提供了一个上下文对象,例如在 Java 中,OpenTelemetry 使用 ThreadLocal 来存储上下文;在 Go 中,OpenTelemetry 使用 context 包来存储上下文;在 Node.js 中,OpenTelemetry 使用 async_hooks 包来存储上下文;在 Python 中,OpenTelemetry 使用 threading.local 来存储上下文。在 C++ 中,OpenTelemetry 使用 Boost.Context 库来实现上下文的管理。
Context 可用于存储跟踪(Tracing)、日志(Logging)和指标(Metrics)数据等信号,并且可以通过 API 进行访问,通过调用这些 API 可以访问整个上下文对象,这意味着 Tracing、Logging 和 Metrics 信号是相互集成的,在整个上下文中共享数据。例如,如果同时启用了 Tracing 和 Metrics 信号,记录一个 Metrics 可以自动创建一个 Tracing 范例。Logging 也是如此:如果有的话,Logging 会自动绑定到当前的 Tracing。
进程内传播可以是隐式的或显式的,具体取决于所使用的编程语言。隐式传播是通过将活动上下文存储在线程局部变量(Java、Python、Ruby、NodeJS)中自动完成的。显式传播需要显式地将活动上下文作为参数从一个函数传递到另一个函数 (Go)。
对于分布式传播的跟踪,tracer 会为第一个请求生成一个唯一的 transaction id,并将其添加到请求的上下文中。在后续的请求中,可以通过这个上下文来获取 transaction id,并用于关联整个请求链路。对于数据库访问的关联,可以使用 OpenTelemetry 提供的数据库集成库,例如 OpenTelemetry Java Instrumentation 中的 JDBC 集成库。这个集成库会自动将 transaction id 添加到数据库请求中,并将数据库访问的信息添加到 span 中,以便更好地理解整个请求链路的性能和行为。
ArexThreadLocal 是 AREX 的存储 Context 的基础类,继承 InheritableThreadLocal 类,用于存储 Tracing、Logging 和 Metrics 信号等数据。
public class ArexThreadLocal<T> extends InheritableThreadLocal<T> {}
InheritableThreadLocal 是 Java 中的一个线程本地存储类,它允许子线程继承父线程的线程本地变量。与普通的 ThreadLocal 不同,InheritableThreadLocal 可以在子线程中访问父线程中设置的线程本地变量。
InheritableThreadLocal 的特点包括:
可以在子线程中访问父线程中设置的线程本地变量,这对于需要在多个线程之间共享数据的场景非常有用。
InheritableThreadLocal 是线程安全的,多个线程可以同时访问同一个 InheritableThreadLocal 实例中的线程本地变量,而不会出现线程安全问题。
InheritableThreadLocal 可以被继承,子线程可以继承父线程中设置的 InheritableThreadLocal 实例中的线程本地变量。这个特性可以让子线程继承父线程中的上下文信息,从而更方便地进行任务处理。
需要注意的是,InheritableThreadLocal 的使用需要谨慎,因为它可能会导致内存泄漏问题。如果在 InheritableThreadLocal 中存储的对象没有及时清理,那么这些对象会一直存在于内存中,直到应用程序退出。
因此,在使用 InheritableThreadLocal 时,需要注意及时清理其中存储的对象,以避免内存泄漏问题。
TraceContextManager 是 AREX 跟踪上下文的管理对象,其中包含了一个静态变量 TRACE_CONTEXT 的对象 (ArexThreadLocal),用于存储和读取 TraceID。同时,TraceContextManager 还包含了 IDGenerator,用于生成以"AREX-"为前缀的 ID。
通过 TraceContextManager 对静态变量 TRACE_CONTEXT 进行设置操作,可以理解为 Tracing 的入口,这样可以设置 TransactionID,进行上下文的跟踪。
划重点,这里要特别注意两个函数,currentContext() 这个是内部依赖的调用,currentContext(boolean createIfAbsent, String caseId) 是回放的调用,入口录制的调用。这两个函数是理解代码的重点。
public class ContextManager {
...
public static ArexContext currentContext() {
return currentContext(false, null);
}
/**
* agent will call this method
*/
public static ArexContext currentContext(boolean createIfAbsent, String caseId) {
// replay scene
if (StringUtil.isNotEmpty(caseId)) {
TraceContextManager.set(caseId);
ArexContext context = ArexContext.of(caseId, TraceContextManager.generateId());
// Each replay init generates the latest context(maybe exist previous recorded context)
RECORD_MAP.put(caseId, context);
return context;
}
...
ContextManager 调用 Set
当 ContextManager 中 currentContext() 函数被调用时,如果传入的 caseID 不为空,则表示当前是回放场景,并设置 set(caseID)。
当调用 currentContext() 函数时,如果传入的 caseID 为空,则会调用 TRACE_CONTEXT.get() ,如果获取不到,则调用如下代码,通过 IDGenerator 生成一个新的 transactionID,并存储到 ThreadLocal 中,表示当前场景为录制(record)场景。
messageId = idGenerator.next();
TRACE_CONTEXT.set(messageId);
最后,生成的 transactionID 和对应的 ArexContext 会被存储到 ConcurrentHashMap 中,以 transactionID 为 key,ArexContext 为 value,供后续调用时使用。ContextManager 管理所有的 ArexContext,将它们存储在 Map 中,Key 是 caseID。
public class ContextManager {
public static Map<String, ArexContext> RECORD_MAP = new ConcurrentHashMap<>();
}
...
其中 ArexContext 存储 caseID,replayID 等信息
public class ArexContext {
private final String caseId;
private final String replayId;
private final long createTime;
private final SequenceProvider sequence;
private final List<Integer> methodSignatureHashList = new ArrayList<>();
private final Map<String, Object> cachedReplayResultMap = new ConcurrentHashMap<>();
private Map<String, Set<String>> excludeMockTemplate;
...
}
ContextManager.currentContext() 函数用于在 Agent 注入脚本中查询当前上下文。
当函数被调用时,ContextManager.currentContext(true, id):
public class FilterInstrumentationV3 extends TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(hasSuperType(named("javax.servlet.Filter")));
}
@Override
public List<MethodInstrumentation> methodAdvices() {
ElementMatcher<MethodDescription> matcher = named("doFilter")
.and(takesArgument(0, named("javax.servlet.ServletRequest")))
.and(takesArgument(1, named("javax.servlet.ServletResponse")));
return Collections.singletonList(new MethodInstrumentation(matcher, FilterAdvice.class.getName()));
}
...
}
在 ServletAdviceHelper 中,shouldSkip 方法会校验是否超过频率限制(RecordLimiter.acquire),如果超过限制则不再进行 Trace 处理,而是直接返回。如果请求没有超过限制,则会生成 TraceID,并调用 CaseEventDispatcher 中的 onEvent 函数,传入一个 CreateEvent 对象(CaseEventDispatcher.onEvent(CaseEvent.ofCreateEvent),表示创建了一个新的 Trace。
CaseEventDispatcher.onEvent(CaseEvent.ofEnterEvent());
if (shouldSkip(adapter, httpServletRequest)) {
return null;
}
在 AREX 的注入代码中,会调用 ContextManager.needReplay()
和 ContextManager.needRecord()
函数。在这两个函数中,会通过调用 currentContext()
函数获取 AREX 上下文对象,并根据上下文对象中的数据,判断当前是回放还是录制模式,如果 context.isReplay(),就是回放, 否则就是录制。
入口请求的录制是在 javax.servlet.Filter
的 doFilter(还有几个其他的类和函数等等)收到请求后
,如果通过录制频率检测,就会开始录制请求。
AREX 的注入用 ByteBuddy 实现,ByteBuddy 功能强大易用,它提供了许多注解,用于在生成或修改字节码时进行注释和配置,如下:
注解 | 值 | 描述 | |
---|---|---|---|
@OnMethodEnter |
表示这个方法会在,进入目标方法时调用,这个注解声明的方法必须是static 。当目标的方法是constructor 构造器时,@This 只能写field ,不能读 field,或者调用方法 |
skipOn() | 跳过一些方法, OnDefaultValue 当 advice 方法的返回值是{false for boolean , 0 for byte、short、char、int、long、float 、double , 或者null for 引用类型 } 那么就跳过目标方法。(简单说就是基本类型的默认值或者 null, 就跳过) OnNonDefaultValue 就是刚好相反 void 默认 代表不跳过任何方法 自定义类型 ,基本类型的坏处是,无法保留额外的信息,仅仅靠判断 0 或者非零。 自定义类型,可以携带额外的信息。 |
prependLineNumber() | 如果为 true,会改目标方法的行号 | ||
inline() | 标识方法应该被内联到目标的方法中去 | ||
suppress() | 忽略某些异常, 处理告警,默认是抑制告警 | ||
@OnMethodExit |
表示这个方法会在,目标方法结束时调用,这个注解声明的方法必须是static 。如果方法提前终止了,那么这个就不被调用 |
repeatOn() | 标识目标方法是否被重复执行 |
onThrowable() | 一般被织入的方法抛出了某些异常,可以由响应的handler 处理 |
||
backupArguments() | 备份所有被执行方法的类型,开始会影响效率, 备份参数,默认为 true,会专门的 copy 原先的参数 | ||
inline() | 标识方法应该被内联到目标的方法中去 | ||
suppress() | 忽略某些异常 | ||
@This |
表示被注解的参数,应该是被修改对象的引用,不能用在静态方法和构造器上 | optional() |
= false 默认值。不能用在构造器 和静态方法 ,否则会报错 Exception in thread "main" java.lang.IllegalStateException: Cannot map this reference for static methodor constructor start 。如果optional() = true 是,遇到构造器 和静态方法 这种没有实例的对象是,This 获取可以为null 。 |
readOnly() | 只读不能修改,效果相当于final ,不能修改传入的对象。 |
||
typing() | 类型转化,Assigner.Typing.STATIC; 不会进行强制转换,如果类型不符合直接报错,Assigner.Typing.DYNAMIC; 会进行强制转换。 |
||
@Argument |
被标注到目标类型的参数上,表示被标注的参数会被 value() 代表的索引拿到 | 注解用来获取传入的参数。 | |
readOnly() | 只读 | ||
typing() | 转换这个类型使用的方式,默认是静态转换 (类型不会被改动),动态转换是 void.class 可以被改成其他类 | ||
optional() | 备选值,如果索引不存在,会使用这个提供的值。默认是关闭的 | ||
@AllArguments |
使用一个数组来包含目标方法的参数,目标的参数必须是一个数组。 | 获取了所有入参了 | |
readOnly() | 只读 | ||
typing() | 类型开关 | ||
@Return |
标注在参数上,来引用目标方法的返回值. 只能标注在@Advice.OnMethodExit 上,用来承接返回值. 理解就像是指针,指向返回值 |
readOnly() | 只读 |
typing() |
类型转化,默认是静态转换 | ||
@Thrown |
获取抛出的异常 | 捕获异常 | |
readOnly() | 只读 | ||
typing() | 默认是动态转化,可以改变类型 | ||
@FieldValue |
被注解的参数,可以引用,目标 method 内的定义的局部变量 就是获取目标类中的变量 | String value() | 局部变量的名称 |
declaringType() | 被声明的类型 | ||
readOnly() | 只读的 | ||
typing() | 默认是静态转化 | ||
@Origin |
使用一个 String 代表目标方法的 利用反射,将目标字符串的签名,转化为方法和类格式,然后去调用。 | String value() default DEFAULT | 默认值是""
|
@Enter |
标注在参数上,指向被标注@OnMethodEnter 的 advice 方法的返回值, |
获取返回值 | |
readOnly() | 只读标记 | ||
typing() | 转换 | ||
@Exit |
标注在参数上,指向被标注@OnMethodExit 的 advice 方法的返回值, |
获取返回值 | |
readOnly() | 只读标记 | ||
typing() | 转换 | ||
@Local |
声明被注解的参数当做一个本地变量,被Byte Buddy ,织入目标方法中。本地变量可以被@OnMethodEnter 和 @link OnMethodExit 读写。然而如果本地变量被 exit advice 引用了,它必须也在 enter 的 advice 所声明。就是用来交换变量的。 创建本地变量 中值,为方法创建一个局部变量。 常见的用途是 在方法内创建一个局部变量,然后可以被@Advice.OnMethodEnter 和@Advice.OnMethodExit 同时获取到。 |
String value() | name |
@StubValue |
mock 值,总是返回一个设定的值 | 必须使用 Object 迎接, 返回默认值比如 null ,0 |
|
@Unused |
让被标记的参数,总是返回默认值,比如 int 返回 0, 其他类型返回 null | ||
public class SyncClientModuleInstrumentation extends ModuleInstrumentation
定义一个名为 SyncClientModuleInstrumentation 的类,继承自 ModuleInstrumentation 类。并添加了 @AutoService(ModuleInstrumentation.class)
注解, 实现 InternalHttpClientInstrumentation 类,该类继承了 TypeInstrumentation 类。
public class InternalHttpClientInstrumentation extends TypeInstrumentation
InternalHttpClientInstrumentation 类实现 typeMatcher 函数,用于匹配待注入的类名,这里使用了 "org.apache.http.impl.client.InternalHttpClient" 作为待注入的类名。
如果需要注入多个类,可以使用一些辅助函数如 nameContains()、nameEndsWith() 和 nameStartsWith() 等进行类名的匹配:
InternalHttpClientInstrumentation 实现函数 methodAdvice,用于获取要注入的函数。
return singletonList(new MethodInstrumentation(
isMethod().and(named("doExecute"))
.and(takesArguments(3))
.and(takesArgument(0, named("org.apache.http.HttpHost")))
.and(takesArgument(1, named("org.apache.http.HttpRequest")))
.and(takesArgument(2, named("org.apache.http.protocol.HttpContext"))),
this.getClass().getName() + "$ExecuteAdvice"));
实现 ExecuteAdvice 类,此处关联之前的 $ExecuteAdvice。
public static class ExecuteAdvice
注入代码,在被注入的函数进入时,从参数中获取 Request 请求,创建本地变量 extractor 和 mockResult。
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class, suppress = Throwable.class)
public static boolean onEnter(
@Advice.Argument(1) HttpRequest request,
@Advice.Local("extractor") HttpClientExtractor<HttpRequest, HttpResponse> extractor,
@Advice.Local("mockResult") MockResult mockResult) { ...
判断请求报文 Request,如果满足 ingore 条件, 则退出。
从上下文中判断, 当前是否处于录制或者回放状态(ContextManager.needRecordOrReplay())
如果是回放状态,则查询回放需要的 MOCK 数据 mockResult = extractor.replay()
如果从数据库中获取到 MOCK 数据是 Throwable 类型,则返回成功及 Object。
如果获取到的 MOCK 数据是 HttpResponseWrapper 类型,则返回成功及处理好的响应报文(即 MOCK 数据)。
注入代码,在被注入函数退出时,获取异常和返回值 Request。
@Advice.OnMethodExit(onThrowable = Exception.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown(readOnly = false) Exception throwable,
@Advice.Return(readOnly = false) CloseableHttpResponse response,
@Advice.Local("extractor") HttpClientExtractor<HttpRequest, HttpResponse> extractor,
@Advice.Local("mockResult") MockResult mockResult) {...
如果函数进入上处理的 MockResult 结果判断,如果不为空,且是 Throwable, 则 throwable 变量赋值否则返回 response
如果 MockResult 为空,则检查是否是在录制状态,在录制状态下开始记录数据到数据库(throwable 或者 response)。
JedisModuleInstrumentation 类是继承自 ModuleInstrumentation 的类。
JedisFactoryInstrumentation 类是继承自 TypeInstrumentation 的类。
匹配的类和函数名方法与之前的文本描述类似,这里不再重复。
makeObject 是一个被注入的函数,在进入函数时,会创建一个名为 JedisWrapper 的类并将其返回给原有类的 jedisSocketFactory 字段。
clientConfig 与上述步骤类似,也是被注入的函数。
makeObject 被注入函数,离开函数时会调用 jedis,并返回 result。
AREX 在进行数据采集时,同一个请求,会采集下来多条数据(Request/Response、其它服务调用的请求响应等),我们需要把这些数据串联起来,这样才能完整的做为一个测试用例。而我们的应用往往采用了异步框架,也大量的用到了多线程等,这给数据的串联带来很大的困难。
FutureTaskInstrumentation 包含 $CallableAdvice 和 $RunnableAdvice 两个静态内部类。
public static class CallableAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter(
@Advice.Argument(value = 0, readOnly = false) Callable<?> callable) {
callable = CallableWrapper.get(callable);
} FutureTask }
@SuppressWarnings("unused")
public static class RunnableAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter(
@Advice.Argument(value = 0, readOnly = false) Runnable runnable) {
runnable = RunnableWrapper.get(runnable);
}
}...
包含 $ExecutorRunnableAdvice 和 $ExecutorCallableAdvice 两个类,实现同上。
包含 $StartAdvice 类。其中包含一个名为 methodEnter
的静态方法,该方法使用了 Java Agent 中的 Advice
注解。这个方法的作用是在 run
方法执行前拦截它,并进行一些操作。具体来说,这个方法会将 run
方法的参数 runnable
通过 FieldValue
注解获取到,然后检查是否存在 ArexContext
,如果存在,就使用 RunnableWrapper.get()
方法包装这个 runnable
,然后再将包装后的 runnable
赋值回去。
public static class StartAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter(
@Advice.FieldValue(value = "target", readOnly = false) Runnable runnable) {
ArexContext context = ContextManager.currentContext();
if (context != null) {
runnable = RunnableWrapper.get(runnable);
}
}
}
...
Java 生态中存在许多异步框架和类库,例如 Reactor 和 RxJava 等,也有许多类库提供了异步实现,例如 lettuce 提供了同步/异步访问 Redis 的方式。由于不同的场景通常需要不同的解决方案,所以需要使用不同的方法来解决异步编程中的跨线程跟踪问题。
以 ApacheAsyncClient 为例,它是通过一个固定的线程来监听响应并发起回调的。因此,在整个调用、监听、回调过程中,需要确保多个跨线程的 Trace 传递。
为了解决这个问题,可以注入如下 org.apache.http.impl.nio.client.InternalHttpAsyncClient 类的 execute 函数,并使用 FutureCallbackWrapper 中的 TraceTransmitter 来传递 Trace。
public class InternalHttpAsyncClientInstrumentation extends TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.http.impl.nio.client.InternalHttpAsyncClient");
}
@Override
public List<MethodInstrumentation> methodAdvices() {
return singletonList(new MethodInstrumentation(
isMethod().and(named("execute"))
.and(takesArguments(4))
.and(takesArgument(0, named("org.apache.http.nio.protocol.HttpAsyncRequestProducer")))
.and(takesArgument(1, named("org.apache.http.nio.protocol.HttpAsyncResponseConsumer")))
.and(takesArgument(2, named("org.apache.http.protocol.HttpContext")))
.and(takesArgument(3, named("org.apache.http.concurrent.FutureCallback"))),
this.getClass().getName() + "$ExecuteAdvice"));
}...
为了系统的稳定性,AREX agent 的框架代码是在一个独立的 Class loader 中加载,和应用代码并不互通,为了保证注入的代码可以正确在运行时被访问,我们也对 ClassLoader 进行了简单的修饰,保证运行时的代码会被正确的 ClassLoader 加载。
类似于 SpringBoot 的 LaunchedURLClassLoader,它是一个类加载器,主要负责加载应用程序的类和资源,并在应用程序启动时根据应用程序的 classpath 和 JAR 文件创建一个 URL 数组,然后使用这个 URL 数组来初始化 ClassLoader。
当应用程序需要加载类或资源时,LaunchedURLClassLoader 会首先在自己的缓存中查找,如果找不到,就会从 URL 数组中的 URL 中加载类或资源。
自定义的 URLClassLoader 和系统自带的 ClassLoader 可能会冲突,这是因为 Java 中的类加载器采用了双亲委派模型。
在这个模型中,每个类加载器都有一个父类加载器,当一个类需要被加载时,它首先会委托它的父类加载器去加载,如果父类加载器无法加载,它才会尝试自己去加载。 但是这种情况下,如果自定义的 URLClassLoader 和系统自带的 ClassLoader 都能够加载同一个类,那么就会出现两个不同的类实例,这就会导致程序出现问题。
为了避免这种冲突,可以在自定义的 URLClassLoader 中重写 findClass() 方法,让它只加载自己的类,而不是委托给父类加载器。这样就可以保证自定义的 URLClassLoader 和系统自带的 ClassLoader 不会冲突。
public class AgentClassLoader extends URLClassLoader
由 package io.arex.inst.runtime 开头的类需要使用自定义的 URLClassLoader 进行加载,而不是系统自带的 ClassLoader。这是因为这些类是作为注入的代码来运行的,而为了保证代码能够正确地被访问,必须使用自定义的 URLClassLoader 来加载它们。因此,这些类必须走用户态 ClassLoader 加载,而不能使用系统自带的 ClassLoader。
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
if (StringUtil.startWithFrom(name, "runtime", 13)) {
return null;
}
JarEntryInfo jarEntryInfo = findJarEntry(name.replace('.', '/') + ".class");
if (jarEntryInfo != null && jarEntryInfo.getJarEntry() != null) {
byte[] bytes;
try {
bytes = getJarEntryBytes(jarEntryInfo);
} catch (IOException exception) {
throw new ClassNotFoundException(name, exception);
}
definePackageIfNeeded(jarEntryInfo, name);
return defineClass(name, bytes);
}
return null;
}
...
AgentInitializer 类在 Premain 方法中调用 initialize(),并访问 AgentClassLoader 以加载自己的 jar 包。如下图:
public class ArexJavaAgent {
public static void premain(String agentArgs, Instrumentation inst) {
agentmain(agentArgs, inst);
}
public static void agentmain(String agentArgs, Instrumentation inst) {
init(inst, agentArgs);
}
private static void init(Instrumentation inst, String agentArgs) {
try {
installBootstrapJar(inst);
// those services must load by app class loader
//ServiceInitializer.start(agentArgs);
AgentInitializer.initialize(inst, getJarFile(ArexJavaAgent.class), agentArgs);
System.out.println("ArexJavaAgent installed.");
} catch (Exception ex) {
System.out.println("ArexJavaAgent start failed.");
ex.printStackTrace();
}
}
...
AREX 文档:http://arextest.com/zh-Hans/docs/intro/
AREX 官网:http://arextest.com/
AREX GitHub:https://github.com/arextest
AREX 官方 QQ 交流群:656108079