STF 从 zmq 和 protobuf 谈 STF 中的消息传递

blueshark · 2017年05月05日 · 最后由 zhanglimin 回复于 2019年01月16日 · 4576 次阅读
本帖已被设为精华帖!

一、前言

用过 STF 的都知道,只要用户点击使用按钮,这台手机就会被标记为占用状态,其他用户在设备列表立即就可以看到某人使用了手机,同时其他用户也不再能使用这台手机,这种即时的消息肯定不能靠接口等传统方式进行传递。事实上,在 STF 中,很多信息都是通过消息来传递的,这其中用到了很多工具,比如说 zeromq 和 protobuf。下面根据我的理解讲一下 STF 中的消息传递过程。

二、zeromq 和 protobuf 基础

2.1 zeromq 介绍和使用

zmq 号称史上最快的消息队列,当然,快是以牺牲其他方面的性能为代价的。首先看下消息队列的定义:

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

打个比方,人与人交流时可以通过讲话来完成,一个大型系统各个模块之间的通信就需要消息队列来完成了。再以 STF 以例,当某个设备上线后,provider 可以通过消息告诉在线的用户这台设备处于可用状态,当某个人使用了设备以后,需要通过消息告诉其他人这台手机牌繁忙状态,同时通知手机开始传输屏幕图像,再通知数据库改变设备的使用状态。

当然,有很多开源的消息队列工具可以使用,zmq 也不是最流行的一个。其他常用消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 等等。

zmq 也是基于 socket 接口进行通信的。zmq 与 Socket 的区别是普通的 socket 是端到端的(1:1 的关系),而 ZMQ 却是可以 N:M 的关系。就是多个服务端可以同时向多个客户端发消息,同时会自动处理错误等细节。

2.1.1 zmq 的模式

zmq 具有多种工作模式:

  • request-reply
  • push-pull
  • publish-subscribe
  • dealer-router

这些模式适用于不同的场景,在 STF 中用到了:push-pull,publish-subscribe,dealer-router 这三种模式。

push-pull:

push/pull 是单向模式,消息只能由 push 端发出,由 pull 端进行拉取。一般来说 pull 端对消息进行处理,如果一个 pull 端不能及时处理,可以同时有多个 pull 端,这种情况下,一条消息只能被 一个 pull 端拉取,拉过之后其他 pull 端就不能再次拉取。如果没有 pull 端拉取,消息过多的时候可能会溢出。

下面是 node 的示例代码:

// producer.js,push端
var zmq = require('zeromq')
  , sock = zmq.socket('push');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');

var count = 0
setInterval(function(){
  var message = "some work"+(count++);
  console.log(message);
  sock.send(message);
}, 2000);
// worker.js,pull端
var zmq = require('zeromq')
  , sock = zmq.socket('pull');

sock.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');

sock.on('message', function(msg){
  console.log('work: %s', msg.toString());
});

worker.js 代码可以同时运行多个。一个 worker pull 过以后其他 worker 不会再 pull 到相同的消息。

在 stf 中,push-pull 模式可用于用户和 provider 之前的消息处理,因为一条消息只需要处理一次,并且可以有多个处理端,即 processor。

publish-subscribe:

这属于发布订阅模式。与 push-pull 所不同的,pub 会向所有已经连接的 sub 发消息,如果没有 sub 连接,消息会被丢弃。简单来说 pub-sub 就是一个像大喇叭一样的广播系统,如果此时没有听到广播,后面就不会听到了。

// pubber.js
var zmq = require('zeromq')
  , sock = zmq.socket('pub');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');

setInterval(function(){
  console.log('sending a multipart message envelope');
  sock.send(['kitty', 'kitty meow!']);
  sock.send(['cats', 'cats meow!']);
}, 2000);
// subber.js
var zmq = require('zeromq')
  , sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:3000');
sock.subscribe('kitty');
sock.subscribe('cats');
console.log('Subscriber connected to port 3000');

sock.on('message', function(topic, message) {
  //console.log('received a message related to:', topic, 'containing message:', message);
  console.log('received a message related to:', topic.toString(), 'containing message:', message.toString());
});

subber.js 可以同时运行多个,每个 subber 都会收到相同的消息。pub 和 sub 还可以订阅特定的关键字。比如说如果 sub 只订阅了 cats 关键字,只会收到 cats meow! 消息,也可以同时订阅多个关键字。

在 STF 中,pub-sub 模式用于广播设备的变更信息,比较如某个手机上线和下线,需要通知到所有的用户还有数据库。

dealer-router:

dealer/router 是路由模式,适用于有多个发送端和多个接收端的情况,这样可以实现负载均衡。

在 stf 中,同时有多个用户和多台手机在线,dealer-router 很适用于这种情况下的消息传递。

这种模式还没理解太透,等理解透了把代码补上。

2.1.2 zmq 支持的语言

zmq 最开始由 c/c++ 编写的,但是现在已经支持 java、node、python 等语言

node 版的 zmq 前文已经做了 demo,下面举一个 java subber 的例子:

import com.alibaba.fastjson.JSONObject;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

public class Zmq_Thread {
    public void start(String url,String subscription) {
        ZContext context = new ZContext();
        ZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);
        if (url != null) {
            subscriber.connect(url);
        }
        else {
            subscriber.connect("tcp://127.0.0.1:7350");
        }

        if (subscription == null){
            subscription = "test";
        }

        subscriber.subscribe(subscription.getBytes(ZMQ.CHARSET));

        while (true) {
            String topic = subscriber.recvStr();
            if (topic == null)
                break;
            String data = subscriber.recvStr();
            assert(topic.equals(subscription));
            JSONObject jsonObject = JSONObject.parseObject(data) ;
            System.out.println(jsonObject.toJSONString());
        }
        context.destroy();
    }
}

关于 java 的其他模式可以参考官方 demo 或者网上相关教程。这里举 java subber 的原因是很多公司的项目是采用 java 的,stf 与其他项目结合是需要用到 jeromq。

2.2 protobuf 的使用

Protocol Buffer 是 Google 的数据交换的格式,与 protobuf 类似的东西其实是 json 和 xml,protobuff 的优势在于更小的体积,这样在大量数据传输的时候节省了带宽资源。与 json 和 xml 所不同的是,protobuff 自带了一个编译器,protoc,只需要用它进行编译,可以编译成 JAVA、python、C++ 代码,简单来说,它可以生成对应语言的数据类型,比如说生成 java 的一个类等等。

由于 stf 是 node 语言,这里重点介绍 node 中 protobuf 的使用,protobuf 使用前需要先编写一个 proto 文件,定义消息类型,举个 protobuf.js 的例子:

// user.proto
package user;
syntax = "proto2";

message username {
    string username_field = 1; 
}
//user.js
var protobuf = require("protobufjs");

protobuf.load("user.proto", function(err, root) {
    if (err) throw err;

    // Obtain a message type
    var user = root.lookup("user.username");

    // Create a new message
    var message = user.create({ usernamefield: "Tom" });

    // Encode a message
    var buffer = user.encode(message).finish();
    // ... do something with buffer

    // Or, encode a plain object
    var buffer = user.encode({ usernamefield: "jerry" }).finish();
    // ... do something with buffer

    // Decode a buffer
    var message = user.decode(buffer);
    // ... do something with message
    console.log(message)

    // If your application uses length-delimited buffers, there is also encodeDelimited and decodeDelimited.
});

上面的代码是异步的,有时候异步不是很好用,我们可以改成同步的。在 STF 中用的就是同步模式,有兴趣的同学可以详细看一下,在 lib\wire 文件夹下,说实话,关于 protobuf 我也没搞太清楚,不过对于 STF 的改造已经够用了。

三、利用 zmq 和 protobuf 增强 STF 的性能

前面关于 zmq 和 protobuffer 讲了那么多,可能很多人已经看晕了,讲这些并不是故弄玄虚,也不是为了显得 stf 多么高深,而是因为不把这些知识深入的搞清楚,根本无法理解 stf 的消息机制,更无法利用消息进行扩展或者与外部交互。

下面说一下 stf 消息的两个应用:扩展节点以及对外发布设备状态。

首先看下面的一张图,这是 STF 官方部署文档中一张结构图,刚接触时,我就知道这张图比较重要,但是看了很长时间也没看出所以然,直到把 zmq 搞懂,才基本有所理解了。

STF

3.1. provider 节点的扩展。

这是最常见的形式了,很多手机并不是完全插在一台 provider(电脑)上,多台电脑就是 provider 的扩展。从上图中可以看出,provider 有两个接口,push 和 sub,从前文中可以知道,push 可以保证消息被可靠的推送成功,而且 sub 用来监听自己感兴趣的消息,对应的实际中,用户点击使用按钮时,发出一个 GroupMessage(占用手机)命令,provider 通过 sub 端收到这个消息后,执行一系列操作,然后通过 push 方式把占用成功的消息推送出去,很明显,占用广播的消息即使没有 provider 回应也没有关系,因为这时候表示手机占用失败,我们用 stf 时偶尔就会出现这个问题,但是占用成功一定要保证 push 成功,否则下一个再占用会造成冲突。

图中 provider 上面的 dev 我认为就是手机,手机上的 STFservice 可以 push 和 pull 数据,但是 STFsevice 是如何联网的我还不太清楚。很显然,一个 provider 上可以同时插多台手机。

从图中可以看出,在进行 provider 扩展的时候,每个 provider 只要连上 dev-triproxy 上就行了,从 dev-triproxy push 和 sub 数据。下面举一个 provider 的启动命令的例子:

docker run --rm \
  --name provider \
  --net host \
  openstf/stf:latest \
  stf provider \
    --name provider1 \
    --connect-sub tcp://devside.stf.example.org:7250 \
    --connect-push tcp://devside.stf.example.org:7270 \
    --storage-url http://stf.example.org/ \
    --public-ip local_ip \
    --min-port=15000 \
    --max-port=25000 \
    --heartbeat-interval 10000 \
    --screen-ws-url-pattern "ws://stf.example.org/d/provider1/<%= serial %>/<%= publicPort %>/"

从命令中可以看出,在 provider 中需要提供几个主要参数:

  • name 这是在 stf 手机列表中显示的 provider 的名字。
  • connect-sub 就是 triproxy 的 pub 的端口。
  • connect-push 是 triproxy 的 pull 的端口。
  • storage-url 一般就是 stf 主页的链接,也可以是自己配置的 storage-url,在 storage 模块中有设置。
  • public-ip 是指 provider 所在的电脑的 ip,这个 ip 会作为 minicap 向外传送图像的地址。
  • min-port、max-port 是指每部手机向外传输图像的端口,因为一个 provider 可以连多台手机,每台手机传输图像的端口不一样。
  • heartbeat-interval 是指心跳时间,为了保证 provider 可用,每隔一段时间 provider 会发出一个 heartbeat,reaper 接收这个 heartbeat,如果 reaper 在自己的超时时间内没有收到 provider 的 heartbeat,会认为这个 provider 下线。
  • screen-ws-url-pattern 是指屏幕传输图像的 url 类型,其中 serial 是指手机的串号,publicPort 是 min-port 和 max-port 之前的一个,这样就能唯一确定一个图像传输的 url。

在扩展 provider 时,只要更改一下 provider 的 ip 和名称,就可以同时上线多个 provider。

3.2.websocket 节点的扩展

websocket 节点同样有 push 和 sub 两个端口,分别用来推送和接收消息,推送的主要消息是用户占用和取消占用的消息,接收的消息主要是设备被占用和设备改变的消息。

下面看一下 websocket 节点的启动命令:

docker run --rm \
  --name %p-%i \
  --link rethinkdb-proxy-28015:rethinkdb \
  -e "SECRET=YOUR_SESSION_SECRET_HERE" \
  -p %i:3000 \
  openstf/stf:latest \
  stf websocket --port 3000 \
    --storage-url https://stf.example.org/ \
    --connect-sub tcp://appside.stf.example.org:7150 \
    --connect-push tcp://appside.stf.example.org:7170

首先解释一下命令中的各个参数,%p 表示模块的名称,%i 表示端口号,在实际应用的需要用对应的参数替换。参数里面有一个 link rethinkdb 的参数,是因为 websocket 模块有些功能需要直接读写数据库。扩展 websocket 节点的时候,如果在同一台电脑,只要修改一下%i 这个端口号就行了,因为同一个系统的两个进程不能监听同一端口,当然,如果没有用 docker,需要修改-p 3000。

websocket 有多个监听端口,怎样用这些端口呢,需要在 nginx 里配置一下:

...
  upstream stf_api {
    server 192.168.255.100:3700 max_fails=0;
    server 192.168.255.100:3701 max_fails=0;
    server 192.168.255.101:3700 max_fails=0;
  }
...

3.3.processor 节点的扩展

processor 节点的扩展就比较简单了。仿照前面 websocket 的扩展,直接启动多个 processerr 模块就可以了。processor 模块扩展一般用于 processor 成为系统的瓶颈的情况下,不过目前为止,我还没有发现 processor 需要扩展的情况,一般都能处理过来。

3.4.triproxy 节点的扩展

官方的框图中表示 triproxy 节点可以进行扩展,但是我实在找不到扩展的办法,还请懂行的人指导!

3.5. 设备状态的广播。

STF 可以提供手机屏幕的实时图像以及实时操作功能,很多时候我们的手机不仅可以提供给别人使用,也会用来做自动化等事情,当手机做自动化时,肯定不希望别人来操作,当有人在使用这台手机时,也不允许做自动化,这时候自动化工具肯定希望知道设备的状态。对于这种需求,一种方式是提供查询的接口,其他应用通过接口查询手机的状态,但是如果有很多应用同时调接口,会给 STF 造成很大压力,同时也有些浪费。

根据前文介绍的 STF 消息机制,我们可以利用 zmq 给 STF 做一个广播模块,实时广播设备的状态改变,其他应用监听到设备的改变后再做对应的操作。事实上,provider 一直在广播设备的改变状态,如果我们用 subber.js 直接收听 7150 端口的数据,我们可以看到设备的改变信息。

广播的格式是 protobuffer,这里说一下 protobuffer 的一些问题,虽然 protobuffer 具更高的数据传输效率,但是同时牺牲了很多灵活性,如果别人想要收听这个广播,还必须拿到一个完整的 proto 文件,然后生成相关的类,如果广播的信息有所增删,则需要重新拉最新的 proto 文件,这给收听都带来很多不便。因此,我们可以利用 STF 的模块再做一次转换,变成普通的 json,这样就灵活多了。

共收到 4 条回复 时间 点赞
shuta STF 系列之二---minitouch 流程源码分析 中提及了此贴 06月04日 17:24

太棒了,建议版主加精。解除了很多 STF 的疑惑,之前看那张结构图也是云里雾里,现在总算清晰了很多。

恒温 将本帖设为了精华贴 06月14日 11:25

值得赏析

最近开始研究 stf ,翻看了楼主的系列文,觉得收获很大,留个言,谢谢楼主的分享~ 👍

楼主你好,我在文章看到这么一句话:在扩展 provider 时,只要更改一下 provider 的 ip 和名称,就可以同时上线多个 provider。
这个是说 stf 服务启动的时候可以设置多个主机为 provider 吗。比如在 A 主机上运行 stf 服务,有两台都连接着手机的主机 B 和 C(B 和 C 主机都已经把 adb 端口打开),在启动 A 上 STF 时,把 B 和 C 的 IP 都要列出来是吧

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册