FunTester 基于时间戳的日志回放引擎
FunTester
·
2022年08月22日
·
最后由 Thirty-Thirty 回复于 2022年09月03日
·
8168 次阅读
「原创声明:保留所有权利,禁止转载」
之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay
的过程中,重新刷新了我的认知。
查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。
解决思路
目前流量回放集中于 HTTP 流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路:
- 日志清洗,其实就是把规范化的日志解析成引擎框架可以使用的对象,通常包含 HTTP 请求的组成部分。
- 按照时间戳排序,通常使用现成的工具这一步是可以省略,但是由于日志记录是已经存在的组件,这里需要做一些兼容性工作
- 日志回放,通过线程池和连接池两个池化技术可以解决性能方面的问题。再结合当前的分布式方案做一些兼容功能即可。
其中最最核心的应该就是队列的选择,这里我用看 java 的java.util.concurrent.DelayQueue
,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试:
-
Java&Go 高性能队列之 LinkedBlockingQueue 性能测试
2022-01-10
-
Java&Go 高性能队列之 Disruptor 性能测试
2022-02-14
-
Java&Go 高性能队列之 channel 性能测试
2022-02-17
本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。
然后我重新对java.util.concurrent.DelayQueue
进行了性能测试延迟队列 DelayQueue 性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟 10s 撤单性能测试。
实现
总体来说实现起来思路比较清晰,我分成三部分分享。
属性定义
- 我首先定义了一个
com.funtester.frame.execute.ReplayConcurrent.ReplayLog
日志对象,用于存储每一个请求日志 - 然后定义一个
com.funtester.frame.execute.ReplayConcurrent#logs
用来存储日志,这里旧事重提一下,千万级别的日志对象,存储在内存里面是 OK 的,所以我才会采用这种方式。为什么要从日志文件中转一手呢?因为日志是不按照时间戳排序的。 - 再定义
com.funtester.frame.execute.ReplayConcurrent#logDelayQueue
用来当作回放请求队列,也就是流量中转站,生产者从com.funtester.frame.execute.ReplayConcurrent#logs
中取,clone
之后丢到队列中;消费者从队列中取对象,丢给线程池。 - 定义
com.funtester.frame.execute.ReplayConcurrent#handle
当作是处理流量的方法,就是把流量对象包装成HttpRequestBase
对象然后发送出去
生产者
- 确定使用异步线程完成,使用Java 自定义异步功能实践。
- 根据
com.funtester.frame.execute.ReplayConcurrent#logDelayQueue
性能测试数据,添加com.funtester.frame.execute.ReplayConcurrent#threadNum
参数来控制。 - 多线程取
com.funtester.frame.execute.ReplayConcurrent#logs
对象,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟队列中进行排序操作。 - 使用了
com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH
控制队列的长度。貌似没找到限制延迟队列长度的 API。只能自己实现了,思路当添加日志数量超过最大值,存储当前队列长度。当长度大于最大长度,则在下一次添加对象前,休眠 1s,然后在重置本地存储的队列长度。这样可以解决这个问题。当然最大值设置足够高,避免 1s 中内队列变成空。回放引擎设计 50 万 QPS,所以我就先设置了 80 万的最大长度。后续可以根据实际情况调整。
消费者
- 依旧使用异步,生产者
- 使用 API 时
java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit)
,避免阻塞导致线程无法终止。 - 引入
com.funtester.frame.execute.ReplayConcurrent#getMultiple
控制流量回放的倍数。 - 使用
com.funtester.frame.execute.ReplayConcurrent#getTotal
记录回放的日志数量。 - 使用
com.funtester.frame.execute.ReplayConcurrent#getHandle
处理日志对象。
代码如下:
package com.funtester.frame.execute
import com.funtester.base.bean.AbstractBean
import com.funtester.frame.SourceCode
import com.funtester.utils.LogUtil
import com.funtester.utils.RWUtil
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.concurrent.DelayQueue
import java.util.concurrent.Delayed
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.LongAdder
/**
* 回放功能执行类*/
class ReplayConcurrent extends SourceCode {
private static Logger logger = LogManager.getLogger(ReplayConcurrent.class);
static ThreadPoolExecutor executor
static boolean key = true
static int MAX_LENGTH = 800000
int threadNum = 2
String name
String fileName
int multiple
Closure handle
List<ReplayLog> logs
DelayQueue<ReplayLog> logDelayQueue = new DelayQueue<ReplayLog>()
LongAdder total = new LongAdder()
ReplayConcurrent(String name, String fileName, int multiple, Closure handle) {
this.name = name
this.fileName = fileName
this.multiple = multiple
this.handle = handle
}
void start() {
if (executor == null) executor = ThreadPoolUtil.createCachePool(THREADPOOL_MAX, "R")
time({
RWUtil.readFile(fileName, {
def delay = new ReplayLog(it)
if (delay.getTimestamp() != 0) logDelayQueue.add(delay)
})
}, 1, "读取日志$fileName")
logs = logDelayQueue.toList()
def timestamp = logs.get(0).getTimestamp()
logDelayQueue.clear()
AtomicInteger index = new AtomicInteger()
AtomicInteger size = new AtomicInteger()
def LogSize = logs.size()
AtomicInteger diff = new AtomicInteger()
threadNum.times {
fun {
while (key) {
if (index.get() % LogSize == 0) diff.set(getMark() - timestamp)
if (index.get() % MAX_LENGTH == 0) size.set(logDelayQueue.size())
if (size.get() > MAX_LENGTH) {
sleep(1.0)
size.set(logDelayQueue.size())
}
def replay = logs.get(index.getAndIncrement() % LogSize)
logDelayQueue.add(replay.clone(replay.timestamp + diff.get()))
}
}
}
threadNum.times {
fun {
while (key) {
def poll = logDelayQueue.poll(1, TimeUnit.SECONDS)
if (poll != null) {
executor.execute {
multiple.times {
handle(poll.getUrl())
total.add(1)
}
}
}
}
}
}
fun {
while (key) {
sleep(COUNT_INTERVAL as double)
int real = total.sumThenReset() / COUNT_INTERVAL as int
def active = executor.getActiveCount()
def count = active == 0 ? 1 : active
logger.info("{} ,实际QPS:{} 活跃线程数:{} 单线程效率:{}", name, real, active, real / count as int)
}
}
}
/**
* 中止
* @return
*/
def stop() {
key = false
executor.shutdown()
logger.info("replay压测关闭了!")
}
/**
* 日志对象*/
static class ReplayLog extends AbstractBean implements Delayed {
int timestamp
String url
ReplayLog(String logLine) {
def log = LogUtil.getLog(logLine)
this.url = log.getUrl()
this.timestamp = log.getTime()
}
ReplayLog(int timestamp, String url) {
this.timestamp = timestamp
this.url = url
}
@Override
long getDelay(TimeUnit unit) {
return this.timestamp - getMark()
}
@Override
int compareTo(Delayed o) {
return this.timestamp - o.timestamp
}
protected Object clone(int timestamp) {
return new ReplayLog(timestamp, this.url)
}
}
}
自测
下面是我的测试用例:
package com.okcoin.hickwall.presses
import com.okcoin.hickwall.presses.funtester.frame.execute.ReplayConcurrent
import com.okcoin.hickwall.presses.funtester.httpclient.FunHttp
class RplayT extends FunHttp {
static String HOST = "http://localhost:12345"
public static void main(String[] args) {
def fileName = "api.log"
new ReplayConcurrent("测试回放功能", fileName, 1, {String url ->
getHttpResponse(getHttpGet(HOST + url))
}).start()
}
}
测试结果如下:
22:45:43.510 main
###### # # # # ####### ###### ##### ####### ###### #####
# # # ## # # # # # # # #
#### # # # # # # #### ##### # #### #####
# # # # # # # # # # # # #
# ##### # # # ###### ##### # ###### # #
10:56:18 F-5 测试回放功能, 实际QPS:23162 活跃线程数:0单线程效率:23162
10:56:23 F-5 测试回放功能, 实际QPS:36575 活跃线程数:6单线程效率:6095
10:56:28 F-5 测试回放功能, 实际QPS:38974 活跃线程数:21单线程效率:1855
10:56:33 F-5 测试回放功能, 实际QPS:32798 活跃线程数:8单线程效率:4099
10:56:38 F-5 测试回放功能,实际QPS:35224 活跃线程数:4单线程效率:8806
10:56:43 F-5 测试回放功能,实际QPS:28426 活跃线程数:0单线程效率:28426
10:56:48 F-5 测试回放功能, 实际QPS:33607 活跃线程数:6单线程效率:5601
10:56:53 F-5 测试回放功能,实际QPS:34392 活跃线程数:0单线程效率:34392
TesterHome 为用户提供「保留所有权利,禁止转载」的选项。
除非获得原作者的单独授权,任何第三方不得转载标注了「原创声明:保留所有权利,禁止转载」的内容,否则均视为侵权。
具体请参见TesterHome 知识产权保护协议。
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!