普罗米修斯提供了多种语言的 client, 用户可以使用 client 很方便的构建自己的 exporter 服务, 后续只需要修改普罗米修斯的配置文件, 就可以把 exporter 加入到普罗米修斯中来。
首先 需要用 pip install prometheus_client
安装客户端
import time
from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily
from prometheus_client import start_http_server
class CustomCollector(object):
def __init__(self):
pass
def collect(self):
g = GaugeMetricFamily("MemoryUsage", 'Help text', labels=['instance'])
g.add_metric(["instance01.us.west.local"], 20)
yield g
c = CounterMetricFamily("HttpRequests", 'Help text', labels=['app'])
c.add_metric(["example"], 2000)
yield c
if __name__ == '__main__':
start_http_server(8000)
REGISTRY.register(CustomCollector())
上面是在 python 中开发一个 exporter 最简单的方式。 我们可以使用prometheus_client
内置的GaugeMetricFamily
和 CounterMetricFamily
来构建自己的监控指标。
首先引入依赖
<!-- The client -->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
<version>0.6.0</version>
</dependency>
package exporter;
import io.prometheus.client.Counter;
import io.prometheus.client.exporter.HTTPServer;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class CustomExporter {
# 注册一个counter类型的监控指标并带有一个名字叫method的label
static final Counter requests = Counter.build()
.name("my_library_requests_total").help("Total requests.")
.labelNames("method").register();
public static void processGetRequest() {
requests.labels("get").inc();
// 在这编写监控逻辑
}
public static void main(String[] args) throws IOException {
// 启动一个线程池,每隔10s种触发一次调用监控逻辑
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.scheduleWithFixedDelay(() -> {
// 模拟一个监控动作, 调用counter类型的自增方法。 当然也可以调用上面的processGetRequest方法
double value = requests.labels("get").get();
System.out.println(value);
requests.labels("get").inc();
}, 0, 10, TimeUnit.SECONDS);
// 利用普罗米修斯提供的httpserver 启动服务
HTTPServer server = new HTTPServer(1234);
}
}
我们实际用 go client 来开发一个监控在 k8s 集群中监控每一个容器的 socket 状态的 exporter。 首先我们需要通过 go mod 文件拉引入依赖。PS:代码的逻辑解释在注释中。
module prophet-container-exporter
go 1.13
require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1
github.com/sirupsen/logrus v1.4.2
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
)
注意: 除了普罗米修斯的 client 之外, 还需要引入 k8s 的 client-go 用来实际的去监控容器的状态。
func init() {
log.SetOutput(os.Stdout)
log.Info("init the kubeconfig")
// 初始化k8s的client有两种方式。 如果当前是在pod中运行的话, client自己会找到容器对应目录下的service account信息与k8s的apiserver通信鉴权。 如果程序在集群外部运行, 那么需要给client提供一个kubeconfig文件
if isExist, err := PathExists("kubeconfig"); err != nil {
panic(err)
} else {
if isExist {
log.Info("now out of k8s cluster")
kubeConfig, err = clientcmd.BuildConfigFromFlags("", "kubeconfig")
} else {
log.Info("now In k8s cluster")
kubeConfig, err = rest.InClusterConfig()
}
if err != nil {
log.Error("cannot init the kubeconfig")
panic(err.Error())
}
}
var err error
k8s, err = kubernetes.NewForConfig(kubeConfig)
if err != nil {
log.Error("cannot init the k8s client")
panic(err.Error())
}
log.Info("init the k8sclient done, now begin to monitor the k8s")
// 在开始监控之前首先遍历当前k8s集群中所有的pod,为每个容器建立一个guage类型的监控指标并记录在一个map中,方便后面程序根据这些容器执行具体的监控逻辑。
register := func() {
namespaceList, err := k8s.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
log.Error(err)
os.Exit(1)
}
for _, n := range namespaceList.Items {
namespace := n.Name
podList, err := k8s.CoreV1().Pods(namespace).List(metav1.ListOptions{})
if err != nil {
panic(errors.Wrapf(err, "cannot list k8s with namespace %s", namespace))
}
// 遍历所有pod
for _, pod := range podList.Items {
# 只监控Running状态的Pod
if pod.Status.Phase != "Running" {
continue
}
// 遍历pod下的容器, 每个容器的指标有3个label。1. 名称空间 2. pod名称 3. 容器名称
for _, container := range pod.Status.ContainerStatuses {
sdGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "namespace_container_Socket_Close_Wait",
Help: "num of socket with CLOSE-WAIT status in container",
ConstLabels: map[string]string{
"namespace": namespace,
"pod": pod.Name,
"container": container.Name,
},
})
if _, ok := sdMetrics[pod.Name+","+namespace]; !ok {
prometheus.MustRegister(sdGauge)
sdMetrics[pod.Name+","+namespace] = sdGauge
}
}
}
}
}
// 上来就注册一次
register()
// 周期性注册namespace下所有pod中app容器的指标, 因为随时会有新的pod启动
go func() {
for {
time.Sleep(time.Minute * 10)
register()
}
}()
}
func main() {
// 周期性的获取最新的监控数据。 遍历所有容器, 向容器发送`cat /proc/net/tcp | awk '{print $4}'|grep 08|wc -l` 命令,该命令可以查询容器中socket的状态,计算一共有多少个处于CLOSE-WAIT的socket。而我们的监控目的就是通过这个指标判断是否存在socket泄露.
go func() {
for {
for podInfo, guage := range sdMetrics {
tmp := strings.Split(podInfo, ",")
podName := tmp[0]
namespace := tmp[1]
log.WithFields(log.Fields{
"namespace": namespace,
"pod": podName,
})
pod, _ := k8s.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
for _, container := range pod.Status.ContainerStatuses {
commands := []string{"sh", "-c", "cat /proc/net/tcp | awk '{print $4}'|grep 08|wc -l"}
output, err := promethues.Exec(k8s, kubeConfig, commands, namespace, podName, container.Name)
if err != nil {
log.Error(err.Error())
continue
}
closeWait, err := strconv.ParseFloat(strings.Replace(output, "\n", "", -1), 32)
if err != nil {
fmt.Fprintf(os.Stdout, "err %s\n", errors.Wrap(err, "cannot trans string to float"))
continue
}
guage.Set(closeWait)
log.Infof("successfully collect %s's socket status", podName)
}
}
time.Sleep(time.Minute * 10)
}
}()
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe("0.0.0.0:80", nil))
}