前言

我们在工作中一定遇见过这样一种情况。我们的产品依赖很多个服务,这些服务可能分配在不同的机器上。我们在测试的时候动不动就会碰到我们认为的bug,例如我们在下单的时候失败了,可能原因很简单,依赖的某一个服务挂了,重起一下就好了。好一点的测试人员就会挨个去看所有依赖服务的 log,看看问题出在哪。而一般的测试人员呢,就会把所有开发挨个找一遍或者干脆提个 bug。而然所有这些情况效率都不高,尤其是第二种情况会占用开发人员的时间。举一个我见到过的一个特别的例子,我们现在的产品以分析大数据并建立模型为主要业务,科学家团队写的机器学习算法很复杂,建立一个模型可能也要 10 几 20 轮的模型训练过程。测试环境的 hadoop 集群也很小。所以导致跑一个几十 G 的数据都要数小时的时间。所以我的同事都习惯提交完任务就跑去做别的了,过几个小时再回来看看什么情况。结果好几次都是因为底层的某个服务在跑之前或者跑得过程中就因为各种原因挂掉了。于是几个小时浪费掉了。所以我们需要有一种机制能帮我们及时发现这种服务级别的问题,并准确定位问题的位置以增加我们的工作效率。这就是我们的监控。

我们都知道生产环境的监控是质量保证很重要的一环,不仅仅是能及时的发现错误。针对特定目标的监控可以帮助我们快速定位问题。针对产品环境的监控是个比较大的话题。其实我也搞不好。在我的另一个帖子里关于监控的讨论说过我以前的公司是怎么做监控的。但那些工具一般都比较复杂,很少有测试人员能 hold 的住。事实上我以往的几个公司,监控大部分都是由运维和开发做的。我们今天就讨论一下简易的,能够用在测试环境的服务级别的监控怎么做吧。

原理

原理其实很简单。我们知道所有的服务都是监听着一个端口号的,所以我们只要尝试使用与这个端口号进行 TCP 连接并发一个包过去就行了。如果没有抱错,那么说明监听这个端口号的服务是正确的。所以我们一般管这个机制叫发包器。用 java 代码表示就是特简单的一个 Socket 就解决了。

实现

首先我们需要一个可以供用户方便注册的地方,用户在这里写监控的信息,例如 ip,端口号,责任人的 email 等等。如下面的例子。

<monitor>
    <environment name="test">
        <service ip="172.27.12.121" name="API tomcat" port="8080" email="sungaofei@4paradigm.com"/>
    </environment>

    <environment name="product">
        <service ip="192.1.1.1" name="1111" port="8080" email="sungaofei@4paradigm.com"/>
    </environment>
</monitor>

上面我们用 xml 来注册被监控的信息。其实如果后面扩展成监控中心的话,注册机制是要存在数据库中的。我们第一版的简易监控就使用 xml 就足够了。然后我们定义一个接口,所有监控程序也就是发包器要实现的接口。为什么单独拉一个接口出来,为了以后的扩展. 建立 socket 只是一种监控方式,以后可能还监控 http 借口,RPC 接口. 他们发包的形式都不一样,但是我们需要统一管理这些发包器. 也就是我们的外部模块不用管监控的是什么,只要知道调用发包器类去发包就行了..

public interface SendRequest extends Callable<Answer>{
}

可以看到这个接口继承了Callable接口,这是 java 中实现并发编程的一个接口(还有一个接口是 runnable),我们的监控程序必须要在多线程下执行,这是为了执行效率,我们的每个服务可能都有一个比较长的超时时间。要是单线程执行的话,监控的服务一旦多起来,可就慢的跟蜗牛一样了。我们看到 Callable 这个线程接口还有一个泛型Answer ,它规定了这个线程执行后的返回值。也就是规定了我们的监控结果。我们看看可以把它定义成什么样子吧。

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class Answer {
    private Boolean ifSucessed = false;
    private String mailSubject;
    private String[] mailToPerson;
    private String mailContent;
    public Boolean getIfSucessed() {
        return ifSucessed;
    }
    public void setIfSucessed(Boolean ifSucessed) {
        this.ifSucessed = ifSucessed;
    }
    public String getMailSubject() {
        return mailSubject;
    }
    public void setMailSubject(String mailSubject) {
        this.mailSubject = mailSubject;
    }
    public String[] getMailToPerson() {
        return mailToPerson;
    }
    public void setMailToPerson(String[] mailToPerson) {
        this.mailToPerson = mailToPerson;
    }
    public String getMailContent() {
        return mailContent;
    }
    public void setMailContent(String mailContent) {
        this.mailContent = mailContent;
    }
}

我们可以看到,它就是个监控结果,如果监控到服务异常,还定制了邮件的内容。下面我们看看具体的发包器的类

public class ServiceRequestSender implements SendRequest {
    private ServiceEntity serviceEntity;

    public ServiceRequestSender(ServiceEntity service) {
        this.serviceEntity = service;
    }

    private Answer setAnswer(Boolean ifSucessed, Throwable e) {
        String[] to = serviceEntity.getEmail().split(",");
        String message = "服务异常警告-ip:" + serviceEntity.getIp() + " 下的端口号为 " + serviceEntity.getPort() + " 的服务: "
                + serviceEntity.getName() + "出现异常,未检测到服务启动";
        String subject = "服务异常警告";

        Answer answer = SpringContext.getBean(Answer.class);
        answer.setIfSucessed(ifSucessed);
        answer.setMailContent(message);
        answer.setMailSubject(subject);
        answer.setMailToPerson(to);

        return answer;
    }

    public Answer call() {
        try {
            InetAddress theAddress = InetAddress.getByName(serviceEntity.getIp()); 
            Socket so = new Socket(theAddress, Integer.parseInt(serviceEntity.getPort()));
            so.close();
        } catch (Throwable e) {
            System.out.println(e.getMessage()+serviceEntity.getName());
            return this.setAnswer(false, e);
        }
        return this.setAnswer(true, null);
    }

}

我们看到其实很简单的逻辑, 建立一个 socket 连接,一旦失败就返回信息就行了. 下面我们再看看最主要的调用这些线程的类吧.

public class TestMonitor {
    @org.testng.annotations.Test
    public void monitor() throws EmailException {

        MonitorRequestBuilder xml = SpringContext.getBean("xml", MonitorRequestBuilder.class);
        BlockingQueue<SendRequest> queue = xml.getRequestQueue();
        ExecutorService executor = Executors.newFixedThreadPool(20);

        List<Future<Answer>> results = null;
        try {
            results = executor.invokeAll(queue);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (Future<Answer> result : results) {
            Answer answer = null;

            try {
                answer = result.get();
            } catch (InterruptedException e) {
                MailSender.sendMailToAdmin("监控程序被终止", "监控程序被终止");
                e.printStackTrace();
            } catch (ExecutionException e) {
                MailSender.sendMailToAdmin("监控程序运行错误", "监控程序运行错误");
                e.printStackTrace();
            } finally {
                MailSender.sendSampleEmail(answer);
            }
        }
        System.out.println();

    }
}

上面的逻辑也很简单,从 xml 中把要监控的信息取出来,每一个信息建立一个发包器线程并放入一个队列 (BlockingQueue queue = xml.getRequestQueue();
做的事情). 然后建立一个容量为 20 的线程池. 然后将发包器线程放入线程池中执行. 最后我们简单的在 for 循环中取出线程执行的结果并发送报警邮件. 是不是很简单. 现在只差一个定时任务了. 这时候最偷懒的方式使用 jekins 这种持续集成工具配置一个定时任务就可以了.你不需要写任何代码. 简易版的监控我也建议这样就可以了.

扩展

现在我们只是针对端口级别的服务做监控. 如果某些底层进程没有端口号呢? 也简单, 我们再写一个发包器,ssh 到那台机器上,一个pgrep 就行了,类似下面的样子.

SSHClient client = new SSHClient("172.27.0.101", 22, "root", "9ol.1qaz");
client.createConnection();
client.createSession();
String temp = client.executeBash("pgrep tomcat", null);

然后我们判断返回值是否找到了那个进程就行了. 之后我们接着扩展.如果我们想监控业务级别呢? 因为进程存在不代表业务就正常啊. 我们希望能以调用服务接口并判断返回值的方式监控. 没错,根接口测试一样一样的. 事实上我们以前就是复用了接口测试的代码. 只需要再写一种类别的发包器然后调用接口测试代码就好了 (这就是为什么我在一开始就定义一个接口的原因). 这里要注意的是监控接口的选取要尽量选择查询的接口以免造成数据污染, 选取稳定的接口以免总是变化.

接着扩展

当然了上面的例子都是监控比较小的场景, 以后业务增多会扩展成一个监控平台,信息都存在数据库中. 当你服务过多的时候,为了效率你需要制作成分发式的监控中心, 为了更好的使用性,你要提供 web 服务给用户自行注册而不是控制一个 xml 等等等等.这些都需要大家自行去扩展了. 我们曾经将这个简易的监控机制扩展到了监控线上的产品.


↙↙↙阅读原文可查看相关链接,并与作者交流