因为去年到现在一直在接触可靠性测试和混沌工程的研发,不可避免地接触 chaosblade 和 chaos-mesh。我们的平台基础能力是使用 chaosblade,所以在去年尝试给 chaosblade 提了第一个 pr,但是一直没有收到回复(发邮件也不回复),感觉 chaosblade 的社区不够活跃,所以我就想换一个项目做贡献,所以就想到了 chaos-mesh,所以和 chaos-mesh 的社区联系,然后顺利接到了任务,所以就开始了我的看源码之路。
来源官网:Chaos Mesh 是一个开源的云原生混沌工程平台,提供丰富的故障模拟类型,具有强大的故障场景编排能力,方便用户在开发测试中以及生产环境中模拟现实世界中可能出现的各类异常,帮助用户发现系统潜在的问题。Chaos Mesh 提供完善的可视化操作,旨在降低用户进行混沌工程的门槛。用户可以方便地在 Web UI 界面上设计自己的混沌场景,以及监控混沌实验的运行状态。
Chaos Mesh 基于 Kubernetes CRD (Custom Resource Definition) 构建,根据不同的故障类型定义多个 CRD 类型,并为不同的 CRD 对象实现单独的 Controller 以管理不同的混沌实验。Chaos Mesh 主要包含以下三个组件:
Chaos Dashboard:Chaos Mesh 的可视化组件,提供了一套用户友好的 Web 界面,用户可通过该界面对混沌实验进行操作和观测。同时,Chaos Dashboard 还提供了 RBAC 权限管理机制。
Chaos Controller Manager:Chaos Mesh 的核心逻辑组件,主要负责混沌实验的调度与管理。该组件包含多个 CRD Controller,例如 Workflow Controller、Scheduler Controller 以及各类故障类型的 Controller。
Chaos Daemon:Chaos Mesh 的主要执行组件。Chaos Daemon 以 DaemonSet 的方式运行,默认拥有 Privileged 权限(可以关闭)。该组件主要通过侵入目标 Pod Namespace 的方式干扰具体的网络设备、文件系统、内核等。
根据上图,其实我们可以比较清晰地看到,整体的数据流转。
第一步:用户在配置好 dnschaos 之后,会通过/experiments 接口发送到 chaos-dashboard 服务。
第二步:chaos-dashboard 服务其实做的事情很简单,就是根据前端传过来的数据,创建一个 crd 资源。
第三步:chaos-mesh 的底层会有个 k8s crd 资源变化的监听器,会接收到 crd 资源,然后调用 chaosimlp 的实现方法 Apply,处理具体的故障逻辑。
第四步:在 DNSChaos 会调用 SetDNSChaos 方法。(而 SetDNSChaos 是在 k8s-dns-chaos 服务里面实现的,k8s-dns-chaos 主要是 coreDNS 的扩展,用来处理 k8s 内部的 dns 请求)。SetDNSChaos 的主要作用是处理 dns 故障和 pod 绑定的关系,然后在接收 dns 请求的时候,给出一个污染后的 dns 记录。
第五步:在第四部将相关 pod 信息,记录到 coredns 插件之后,会将相关 pod 的/etc/resolv.conf 更新成新的 dnsServer,也就是第四部中提到的 k8s-dns-chaos 服务。
创建故障的核心就是创建这个 crd 资源,
//创建crd资源
reflect.ValueOf(chaos).Elem().FieldByName("ObjectMeta").Set(reflect.ValueOf(metav1.ObjectMeta{}))
if err = u.ShouldBindBodyWithJSON(c, chaos); err !=nil {
return
}
//执行创建
if err = kubeCli.Create(context.Background(), chaos); err != nil {
u.SetAPImachineryError(c, err)
return
}
这里面是监听到 crd 资源后的逻辑,可以明显看到 Impl.Apply 是调用了其他实现类的 apply 方法,下文中会提到
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
....
if operation == Apply {
idLogger.Info("apply chaos")
record.Phase, err = r.Impl.Apply(context.TODO(), index, records, obj)
if record.Phase != originalPhase {
shouldUpdate = true
}
if err != nil {
// TODO: add backoff and retry mechanism
// but the retry shouldn't block other resource process
idLogger.Error(err, "fail to apply chaos")
applyFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Apply, err.Error())
if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
records[index].Events = records[index].Events[1:]
}
records[index].Events = append(records[index].Events, *applyFailedEvent)
r.Recorder.Event(obj, recorder.Failed{
Activity: "apply chaos",
Err: err.Error(),
})
needRetry = true
// if the impl.Apply() failed, we need to update the status to update the records[index].Events
shouldUpdate = true
continue
}
}
这里的核心逻辑:
func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
dnschaos := obj.(*v1alpha1.DNSChaos)
for _, pod := range dnsPods {
// 1.处理dns规则:请求k8s-dns-chaos服务
err = impl.setDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name, decodedContainer.Pod, dnschaos.Spec.Action, dnschaos.Spec.DomainNamePatterns)
if err != nil {
impl.Log.Error(err, "fail to set DNS server rules")
return v1alpha1.NotInjected, err
}
impl.Log.Info("Apply DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
}
// 修改相关pod的dns服务的/etc/resolv.conf
_, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
ContainerId: decodedContainer.ContainerId,
DnsServer: service.Spec.ClusterIP,
Enable: true,
EnterNS: true,
})
if err != nil {
impl.Log.Error(err, "set dns server")
return v1alpha1.NotInjected, err
}
return v1alpha1.Injected, nil
}
func (k Kubernetes) SetDNSChaos(ctx context.Context, req *pb.SetDNSChaosRequest) (*pb.DNSChaosResponse, error) {
k.chaosMap[req.Name] = req
var scope string
if len(req.Patterns) == 0 {
scope = ScopeAll
}
...省略
for _, pod := range req.Pods {
....
//这里是将pod信息保存下来
podInfo := &PodInfo{
Namespace: pod.Namespace,
Name: pod.Name,
Action: req.Action,
Scope: scope,
Selector: selector,
IP: v1Pod.Status.PodIP,
LastUpdateTime: time.Now(),
}
k.podMap[pod.Namespace][pod.Name] = podInfo
k.ipPodMap[v1Pod.Status.PodIP] = podInfo
}
return &pb.DNSChaosResponse{
Result: true,
}, nil
}
//这里是将dns请求通过service请求,将这里的数据返回回去
func (k Kubernetes) chaosDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg, state request.Request, podInfo *PodInfo) (int, error) {
//获取action,看是哪种故障,然后如果是error,直接返回error
if podInfo.Action == ActionError {
return dns.RcodeServerFailure, fmt.Errorf("dns chaos error")
}
// return static IP 这将是我写的代码
if podInfo.Action == ActionStatic {
//return staticIP(ctx, w, r, state, podInfo)
//k.chaosMap
}
// 随机的ip
answers := []dns.RR{}
qname := state.Name()
// TODO: support more type
switch state.QType() {
case dns.TypeA:
ips := []net.IP{getRandomIPv4()}
log.Debugf("dns.TypeA %v", ips)
answers = a(qname, 10, ips)
case dns.TypeAAAA:
// TODO: return random IP
ips := []net.IP{net.IP{0x20, 0x1, 0xd, 0xb8, 0, 0, 0, 0, 0, 0, 0x1, 0x23, 0, 0x12, 0, 0x1}}
log.Debugf("dns.TypeAAAA %v", ips)
answers = aaaa(qname, 10, ips)
}
if len(answers) == 0 {
return dns.RcodeServerFailure, nil
}
log.Infof("answers %v", answers)
m := new(dns.Msg)
m.SetReply(r)
m.Authoritative = true
m.Answer = answers
w.WriteMsg(m)
return dns.RcodeSuccess, nil
}
欢迎各位大佬批评指正~