最近在使用 Fabric8 Kubernetes Client
的过程中发现了新大陆一样,感觉利用这个库可以进行很多有趣的功能尝试,其中一个便是日志的本地化。
原因无他,rancher 页面性能实在太差了,经常性的暂停工作,碰到故障排查的时候,着实让人恼火。当我看到 Fabric8 Kubernetes Client
的日志相关 API 的时候我就立刻冒出来写一个日志小工具的想法。
首先我们简单介绍一下 API,以方便快速进入场景。后续等我自觉学得差不多了,再来列个专题给大家分享 Fabric8 Kubernetes Client
的全部 API 实践经验。
在本次分享当中,主要用到了两种日志 API:getLog()
、 watchLog()
。
以下是 Fabric8 Kubernetes Client 日志功能的结构化总结:
功能点 | API 方法 | 适用场景 |
---|---|---|
获取 Pod 日志 | getLog() |
一次性获取短时任务或静态日志 |
获取特定容器日志 | inContainer("name").getLog() |
多容器 Pod 中指定容器的日志 |
实时日志流 | watchLog(outputStream) |
持续监控运行中的服务日志 |
按行获取最新日志 | tailingLines(n).getLog() |
仅需关注最近 N 行日志的场景 |
时间范围筛选 | sinceSeconds(n).getLog() |
获取最近 N 秒内的日志 |
带时间戳日志 | withTimestamps().getLog() |
需要精确时间信息的调试场景 |
获取历史日志(Terminated 容器) | previousLog() |
排查已终止或重启容器的日志 |
批量获取 Pod 日志 | withLabel("key=value").list() |
根据标签筛选多个副本的日志 |
日志持久化到文件 | watchLog(new FileOutputStream(...)) |
长期存档或离线分析日志 |
对于日志需求来讲,流式调用自然是最好不过了,可以及时获取最新的日志信息,还不用后期干预。这里我选择了 watchLog()(无参调用),watchLog()
返回一个 LogWatch 实例,该实例包含 getOutput()
方法,可获取日志流。适用于 手动解析日志流,比 watchLog(System.out)
更灵活。
import com.auto.fault.framework.funtester.frame.SourceCode
import groovy.util.logging.Log4j2
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.LogWatch
@Log4j2
class TesClient extends SourceCode {
public static void main(String[] args) {
try (KubernetesClient client = new DefaultKubernetesClient();
def pods = client.pods().inNamespace("test").list()
def marketPod = pods.getItems().find {
it.getMetadata().getName().contains("FunTester-pod")
}.getMetadata().getName();
LogWatch logWatch = client.pods()
.inNamespace("FunTester-default")
.withName("FunTester-mypod")
.watchLog()) {
logWatch.getOutput().eachLine {
System.out.println("Pod Log: " + it);
}
} catch (Exception e) {
e.printStackTrace();
}
waitForKey("Press any key to exit")
}
}
但是在实际的使用当中,经过几分钟,最长不超过十几分钟之后,流里面居然不再输出日志信创了,感觉很奇怪。经过查询资料和多方面验证,依旧没有解决问题,rancher 自带的 WebSocket 推送日志也遇到这个问题。最终还是把问题甩给了服务端。
目前这种方式只适用于调试过程中查看日志,使用的时候本地启动一个脚本用来实时展示日志的情况。
下面是我的封装方法,仅供参考:
/
* 处理日志流, 通过 WatchLog 方式, 适用于实时日志, 适用于日志量较小的场景,可能会被中断
* @param client
* @param namespace
* @param podName
* @param consumer
* @return
*/
static def handlePodLogFlow(String namespace, String podName, Consumer<String> consumer) {
try (def logWatch = K8sService.client.pods()
.inNamespace(namespace)
.withName(podName)
.tailingLines(200)
.watchLog()) {
logWatch.getOutput().eachLine {
consumer(it)
}
} catch (e) {
log.error("handle log error: {}", e)
}
}
剩下的另外一个方式就是定时任务实现循环拉取日志了,用到了 sinceSeconds()
这个 API,逻辑也比较简单就是每隔一段时间拉取最近一段时间的日志。
但是在使用当中遇到一个问题,由于执行耗时以及网络原因,如果我每 10s 拉取最近 10s 的日志总会丢日志,如果拉取最近 11s 的日志,又会有一些重复的日志。
为了解决这个问题,我特意咨询了 AI,给了下面三种思路。
最终我选了基于时间窗户,日志返回的时间是毫秒时间戳,这样根据时间戳进行筛选,可以避免重复和丢日志的情况。
下面是我的封装代码:
/**
* 处理日志流, 通过 getlogs 方式, 定时获取任务,避免中断
* @param namespace
* @param podName
* @param consumer
* @return
*/
static def handleLogs(String namespace, String podName, Consumer<String> consumer) {
long lastTime
ThreadPoolUtil.scheduleRate({
time {
try (def reader = K8sService.client.pods()
.inNamespace(namespace)
.withName(podName)
.sinceSeconds(11)
.getLogReader()) {
def lines = reader.readLines()
boolean start = false
lines.each {
if (!start && getTimestamp(it) > lastTime) {
start = true
}
if (start) {
consumer(it)
}
}
lastTime = getTimestamp(lines.get(lines.size() - 1));
} catch (Exception e) {
log.error("handle log error: {}", e)
}
}
}, 10)
}
使用方法如下:
import com.auto.fault.framework.funtester.frame.SourceCode
import com.auto.fault.framework.utils.k8s.K8sLog
import groovy.util.logging.Log4j2
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient
import java.util.concurrent.ScheduledFuture
@Log4j2
class TesClient extends SourceCode {
public static void main(String[] args) {
KubernetesClient client = new DefaultKubernetesClient()
ScheduledFuture<?> logs = K8sLog.handleLogs("funtester", "funtester-0", {
println it
})
waitForKey("Press any key to exit")
logs.cancel(true)
client.close()
}
}
原来我也想通过一个去重的队列实现,发现 Java 本身并没有提供这个能力,如果单独写一个比较麻烦,得不偿失,最终也放弃了。
这里分享一下 LinkedHashMap 方案,因为 removeEldestEntry()
方法让我学到了新知识,本来我打算用 Caffeine
实现的,没想到 Java 还提供了替代方案。
import java.util.*;
public class LogProcessor {
private static final int MAX_ENTRIES = 10000; // 只存最近的日志 ID
private static final LinkedHashMap<String, Long> processedLogs =
new LinkedHashMap<>(MAX_ENTRIES, 0.75f) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
return size() > MAX_ENTRIES;
}
};
public void processLogs(List<Log> logs) {
long now = System.currentTimeMillis();
for (Log log : logs) {
if (processedLogs.containsKey(log.getId())) {
continue;
}
processedLogs.put(log.getId(), now);
processLog(log);
}
}
private void processLog(Log log) {
System.out.println("FunTester Processing log: " + log);
}
}
removeEldestEntry
是 LinkedHashMap 提供的一个受保护(protected)方法,用于控制缓存的大小。当 LinkedHashMap 作为 LRU 缓存(最近最少使用缓存) 使用时,可以重写该方法,在元素数量超过限制时自动移除最早插入的元素。
要根据访问时间删除 LinkedHashMap 中的旧数据,可以利用 LinkedHashMap 的 accessOrder=true 特性,让最近访问的数据排在后面,并在 removeEldestEntry 方法中检查数据的时间戳是否超时,超时则删除。适用于 基于时间的自动清理缓存,如日志、会话管理等。
实现步骤
实现方法:
import java.util.*;
public class AccessTimeCache<K, V> extends LinkedHashMap<K, V> {
private final long EXPIRATION_TIME_MS; // 过期时间,单位毫秒
public AccessTimeCache(int capacity, long expirationTimeMs) {
super(capacity, 0.75f, true); // accessOrder=true,启用 LRU
this.EXPIRATION_TIME_MS = expirationTimeMs;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
long currentTime = System.currentTimeMillis();
// 假设 V 是一个包含时间戳的对象,这里需替换成实际数据结构
if (eldest.getValue() instanceof CacheItem) {
CacheItem item = (CacheItem) eldest.getValue();
return (currentTime - item.timestamp) > EXPIRATION_TIME_MS;
}
return false;
}
// 模拟存储数据时的结构
static class CacheItem {
String data;
long timestamp;
public CacheItem(String data) {
this.data = data;
this.timestamp = System.currentTimeMillis();
}
}
}
当然我们还可以根据时间 + 条目总数来控制,这里就不再赘述了。
FunTester 原创精华
【连载】从 Java 开始性能测试
故障测试与 Web 前端
服务端功能测试
性能测试专题
Java、Groovy、Go
白盒、工具、爬虫、UI 自动化
理论、感悟、视频