专栏文章 Fabric8 Kubernetes 日志工具实践

FunTester · 2025年04月22日 · 1249 次阅读

最近在使用 Fabric8 Kubernetes Client 的过程中发现了新大陆一样,感觉利用这个库可以进行很多有趣的功能尝试,其中一个便是日志的本地化。

原因无他,rancher 页面性能实在太差了,经常性的暂停工作,碰到故障排查的时候,着实让人恼火。当我看到 Fabric8 Kubernetes Client 的日志相关 API 的时候我就立刻冒出来写一个日志小工具的想法。

API 简介

首先我们简单介绍一下 API,以方便快速进入场景。后续等我自觉学得差不多了,再来列个专题给大家分享 Fabric8 Kubernetes Client 的全部 API 实践经验。

在本次分享当中,主要用到了两种日志 API:getLog()watchLog()

以下是 Fabric8 Kubernetes Client 日志功能的结构化总结:

功能点与 API 对照表

功能点 API 方法 适用场景
获取 Pod 日志 getLog() 一次性获取短时任务或静态日志
获取特定容器日志 inContainer("name").getLog() 多容器 Pod 中指定容器的日志
实时日志流 watchLog(outputStream) 持续监控运行中的服务日志
按行获取最新日志 tailingLines(n).getLog() 仅需关注最近 N 行日志的场景
时间范围筛选 sinceSeconds(n).getLog() 获取最近 N 秒内的日志
带时间戳日志 withTimestamps().getLog() 需要精确时间信息的调试场景
获取历史日志(Terminated 容器) previousLog() 排查已终止或重启容器的日志
批量获取 Pod 日志 withLabel("key=value").list() 根据标签筛选多个副本的日志
日志持久化到文件 watchLog(new FileOutputStream(...)) 长期存档或离线分析日志

首选日志流

对于日志需求来讲,流式调用自然是最好不过了,可以及时获取最新的日志信息,还不用后期干预。这里我选择了 watchLog()(无参调用),watchLog() 返回一个 LogWatch 实例,该实例包含 getOutput() 方法,可获取日志流。适用于 手动解析日志流,比 watchLog(System.out) 更灵活。

import com.auto.fault.framework.funtester.frame.SourceCode
import groovy.util.logging.Log4j2
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.LogWatch

@Log4j2
class TesClient extends SourceCode {

    public static void main(String[] args) {
        try (KubernetesClient client = new DefaultKubernetesClient();
             def pods = client.pods().inNamespace("test").list()
             def marketPod = pods.getItems().find {
                 it.getMetadata().getName().contains("FunTester-pod")
             }.getMetadata().getName();
             LogWatch logWatch = client.pods()
                     .inNamespace("FunTester-default")
                     .withName("FunTester-mypod")
                     .watchLog()) {
            logWatch.getOutput().eachLine {
                System.out.println("Pod Log: " + it);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        waitForKey("Press any key to exit")
    }
}


但是在实际的使用当中,经过几分钟,最长不超过十几分钟之后,流里面居然不再输出日志信创了,感觉很奇怪。经过查询资料和多方面验证,依旧没有解决问题,rancher 自带的 WebSocket 推送日志也遇到这个问题。最终还是把问题甩给了服务端。

目前这种方式只适用于调试过程中查看日志,使用的时候本地启动一个脚本用来实时展示日志的情况。

下面是我的封装方法,仅供参考:

/
 * 处理日志流, 通过 WatchLog 方式, 适用于实时日志, 适用于日志量较小的场景,可能会被中断
 * @param client
 * @param namespace
 * @param podName
 * @param consumer
 * @return
 */
static def handlePodLogFlow(String namespace, String podName, Consumer<String> consumer) {
    try (def logWatch = K8sService.client.pods()
            .inNamespace(namespace)
            .withName(podName)
            .tailingLines(200)
            .watchLog()) {
        logWatch.getOutput().eachLine {
            consumer(it)
        }
    } catch (e) {
        log.error("handle log error: {}", e)
    }

}

循环拉取

剩下的另外一个方式就是定时任务实现循环拉取日志了,用到了 sinceSeconds() 这个 API,逻辑也比较简单就是每隔一段时间拉取最近一段时间的日志。

但是在使用当中遇到一个问题,由于执行耗时以及网络原因,如果我每 10s 拉取最近 10s 的日志总会丢日志,如果拉取最近 11s 的日志,又会有一些重复的日志。

为了解决这个问题,我特意咨询了 AI,给了下面三种思路。

  1. 基于日志 ID 去重:如果日志自带唯一标识(如 requestId、traceId),可以使用 集合(Set)记录已处理的日志 ID,在处理新日志时先检查 ID 是否已存在,若已存在则跳过。这种方式精准高效,但 Set 可能会无限增长,需要定期清理,适用于日志 ID 唯一且可以长期存储 ID 记录的场景。
  2. 基于时间窗口去重:如果日志没有唯一 ID,但有时间戳,可以记录 上一次处理的最大时间戳,只处理时间戳 大于 这个值的日志,处理完成后更新最大时间戳。这种方式内存占用小,适用于 时间戳严格递增 的情况,但如果日志乱序,可能会丢失部分数据。
  3. 基于 LRU(最近最少使用)缓存去重:如果日志 ID 变化范围大,但不能无限存储已处理 ID,可以使用 固定大小的缓存(如 LRU 哈希表)存储最近 N 条已处理的日志 ID,超出部分自动删除。这种方式适用于 高并发、大量日志流入 的情况,能有效控制内存占用,但需要合理设置缓存大小,以平衡去重效果与资源消耗。

最终我选了基于时间窗户,日志返回的时间是毫秒时间戳,这样根据时间戳进行筛选,可以避免重复和丢日志的情况。

下面是我的封装代码:

/**
 * 处理日志流, 通过 getlogs 方式, 定时获取任务,避免中断
 * @param namespace
 * @param podName
 * @param consumer
 * @return
 */
static def handleLogs(String namespace, String podName, Consumer<String> consumer) {
    long lastTime
    ThreadPoolUtil.scheduleRate({
        time {
            try (def reader = K8sService.client.pods()
                    .inNamespace(namespace)
                    .withName(podName)
                    .sinceSeconds(11)
                    .getLogReader()) {
                def lines = reader.readLines()
                boolean start = false
                lines.each {
                    if (!start && getTimestamp(it) > lastTime) {
                        start = true
                    }
                    if (start) {
                        consumer(it)
                    }
                }
                lastTime = getTimestamp(lines.get(lines.size() - 1));
            } catch (Exception e) {
                log.error("handle log error: {}", e)
            }
        }
    }, 10)

}

使用方法如下:

import com.auto.fault.framework.funtester.frame.SourceCode
import com.auto.fault.framework.utils.k8s.K8sLog
import groovy.util.logging.Log4j2
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient

import java.util.concurrent.ScheduledFuture

@Log4j2
class TesClient extends SourceCode {

    public static void main(String[] args) {
        KubernetesClient client = new DefaultKubernetesClient()
        ScheduledFuture<?> logs = K8sLog.handleLogs("funtester", "funtester-0", {
            println it
        })
        waitForKey("Press any key to exit")
        logs.cancel(true)
        client.close()
    }
}

原来我也想通过一个去重的队列实现,发现 Java 本身并没有提供这个能力,如果单独写一个比较麻烦,得不偿失,最终也放弃了。

基于 LinkedHashMap

这里分享一下 LinkedHashMap 方案,因为 removeEldestEntry() 方法让我学到了新知识,本来我打算用 Caffeine 实现的,没想到 Java 还提供了替代方案。

基于插入顺序

import java.util.*;

public class LogProcessor {
    private static final int MAX_ENTRIES = 10000; // 只存最近的日志 ID
    private static final LinkedHashMap<String, Long> processedLogs =
            new LinkedHashMap<>(MAX_ENTRIES, 0.75f) {
                @Override
                protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
                    return size() > MAX_ENTRIES;
                }
            };

    public void processLogs(List<Log> logs) {
        long now = System.currentTimeMillis();

        for (Log log : logs) {
            if (processedLogs.containsKey(log.getId())) {
                continue;
            }
            processedLogs.put(log.getId(), now);
            processLog(log);
        }
    }

    private void processLog(Log log) {
        System.out.println("FunTester Processing log: " + log);
    }
}

removeEldestEntryLinkedHashMap 提供的一个受保护(protected)方法,用于控制缓存的大小。当 LinkedHashMap 作为 LRU 缓存(最近最少使用缓存) 使用时,可以重写该方法,在元素数量超过限制时自动移除最早插入的元素。

  • eldest 参数表示当前 LinkedHashMap 中 最老(最早插入)的键值对。
  • 返回值 true 时,eldest 会被移除;返回 false,则不会移除。

基于最近访问顺序

要根据访问时间删除 LinkedHashMap 中的旧数据,可以利用 LinkedHashMap 的 accessOrder=true 特性,让最近访问的数据排在后面,并在 removeEldestEntry 方法中检查数据的时间戳是否超时,超时则删除。适用于 基于时间的自动清理缓存,如日志、会话管理等。

实现步骤

  1. 启用 LRU 访问顺序:创建 LinkedHashMap 时,设置 accessOrder=true,使最近访问的数据自动移动到队尾。
  2. 存储时间戳:在 LinkedHashMap 中存储键值对,值包含数据的时间戳。
  3. 按访问时间删除:在 removeEldestEntry 方法中,检查最老的元素是否超过设定的超时时间(如 10 分钟),如果超时,则返回 true 进行删除。

实现方法:

import java.util.*;

public class AccessTimeCache<K, V> extends LinkedHashMap<K, V> {
    private final long EXPIRATION_TIME_MS; // 过期时间,单位毫秒

    public AccessTimeCache(int capacity, long expirationTimeMs) {
        super(capacity, 0.75f, true); // accessOrder=true,启用 LRU
        this.EXPIRATION_TIME_MS = expirationTimeMs;
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        long currentTime = System.currentTimeMillis();
        // 假设 V 是一个包含时间戳的对象,这里需替换成实际数据结构
        if (eldest.getValue() instanceof CacheItem) {
            CacheItem item = (CacheItem) eldest.getValue();
            return (currentTime - item.timestamp) > EXPIRATION_TIME_MS;
        }
        return false;
    }

    // 模拟存储数据时的结构
    static class CacheItem {
        String data;
        long timestamp;

        public CacheItem(String data) {
            this.data = data;
            this.timestamp = System.currentTimeMillis();
        }
    }
}

当然我们还可以根据时间 + 条目总数来控制,这里就不再赘述了。

FunTester 原创精华
【连载】从 Java 开始性能测试
故障测试与 Web 前端
服务端功能测试
性能测试专题
Java、Groovy、Go
白盒、工具、爬虫、UI 自动化
理论、感悟、视频
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册