用过 STF 的都知道,只要用户点击使用按钮,这台手机就会被标记为占用状态,其他用户在设备列表立即就可以看到某人使用了手机,同时其他用户也不再能使用这台手机,这种即时的消息肯定不能靠接口等传统方式进行传递。事实上,在 STF 中,很多信息都是通过消息来传递的,这其中用到了很多工具,比如说 zeromq 和 protobuf。下面根据我的理解讲一下 STF 中的消息传递过程。
zmq 号称史上最快的消息队列,当然,快是以牺牲其他方面的性能为代价的。首先看下消息队列的定义:
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
打个比方,人与人交流时可以通过讲话来完成,一个大型系统各个模块之间的通信就需要消息队列来完成了。再以 STF 以例,当某个设备上线后,provider 可以通过消息告诉在线的用户这台设备处于可用状态,当某个人使用了设备以后,需要通过消息告诉其他人这台手机牌繁忙状态,同时通知手机开始传输屏幕图像,再通知数据库改变设备的使用状态。
当然,有很多开源的消息队列工具可以使用,zmq 也不是最流行的一个。其他常用消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 等等。
zmq 也是基于 socket 接口进行通信的。zmq 与 Socket 的区别是普通的 socket 是端到端的(1:1 的关系),而 ZMQ 却是可以 N:M 的关系。就是多个服务端可以同时向多个客户端发消息,同时会自动处理错误等细节。
zmq 具有多种工作模式:
这些模式适用于不同的场景,在 STF 中用到了:push-pull,publish-subscribe,dealer-router 这三种模式。
push-pull:
push/pull 是单向模式,消息只能由 push 端发出,由 pull 端进行拉取。一般来说 pull 端对消息进行处理,如果一个 pull 端不能及时处理,可以同时有多个 pull 端,这种情况下,一条消息只能被 一个 pull 端拉取,拉过之后其他 pull 端就不能再次拉取。如果没有 pull 端拉取,消息过多的时候可能会溢出。
下面是 node 的示例代码:
// producer.js,push端
var zmq = require('zeromq')
, sock = zmq.socket('push');
sock.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');
var count = 0
setInterval(function(){
var message = "some work"+(count++);
console.log(message);
sock.send(message);
}, 2000);
// worker.js,pull端
var zmq = require('zeromq')
, sock = zmq.socket('pull');
sock.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');
sock.on('message', function(msg){
console.log('work: %s', msg.toString());
});
worker.js 代码可以同时运行多个。一个 worker pull 过以后其他 worker 不会再 pull 到相同的消息。
在 stf 中,push-pull 模式可用于用户和 provider 之前的消息处理,因为一条消息只需要处理一次,并且可以有多个处理端,即 processor。
publish-subscribe:
这属于发布订阅模式。与 push-pull 所不同的,pub 会向所有已经连接的 sub 发消息,如果没有 sub 连接,消息会被丢弃。简单来说 pub-sub 就是一个像大喇叭一样的广播系统,如果此时没有听到广播,后面就不会听到了。
// pubber.js
var zmq = require('zeromq')
, sock = zmq.socket('pub');
sock.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');
setInterval(function(){
console.log('sending a multipart message envelope');
sock.send(['kitty', 'kitty meow!']);
sock.send(['cats', 'cats meow!']);
}, 2000);
// subber.js
var zmq = require('zeromq')
, sock = zmq.socket('sub');
sock.connect('tcp://127.0.0.1:3000');
sock.subscribe('kitty');
sock.subscribe('cats');
console.log('Subscriber connected to port 3000');
sock.on('message', function(topic, message) {
//console.log('received a message related to:', topic, 'containing message:', message);
console.log('received a message related to:', topic.toString(), 'containing message:', message.toString());
});
subber.js 可以同时运行多个,每个 subber 都会收到相同的消息。pub 和 sub 还可以订阅特定的关键字。比如说如果 sub 只订阅了 cats 关键字,只会收到 cats meow! 消息,也可以同时订阅多个关键字。
在 STF 中,pub-sub 模式用于广播设备的变更信息,比较如某个手机上线和下线,需要通知到所有的用户还有数据库。
dealer-router:
dealer/router 是路由模式,适用于有多个发送端和多个接收端的情况,这样可以实现负载均衡。
在 stf 中,同时有多个用户和多台手机在线,dealer-router 很适用于这种情况下的消息传递。
这种模式还没理解太透,等理解透了把代码补上。
zmq 最开始由 c/c++ 编写的,但是现在已经支持 java、node、python 等语言
node 版的 zmq 前文已经做了 demo,下面举一个 java subber 的例子:
import com.alibaba.fastjson.JSONObject;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
public class Zmq_Thread {
public void start(String url,String subscription) {
ZContext context = new ZContext();
ZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);
if (url != null) {
subscriber.connect(url);
}
else {
subscriber.connect("tcp://127.0.0.1:7350");
}
if (subscription == null){
subscription = "test";
}
subscriber.subscribe(subscription.getBytes(ZMQ.CHARSET));
while (true) {
String topic = subscriber.recvStr();
if (topic == null)
break;
String data = subscriber.recvStr();
assert(topic.equals(subscription));
JSONObject jsonObject = JSONObject.parseObject(data) ;
System.out.println(jsonObject.toJSONString());
}
context.destroy();
}
}
关于 java 的其他模式可以参考官方 demo 或者网上相关教程。这里举 java subber 的原因是很多公司的项目是采用 java 的,stf 与其他项目结合是需要用到 jeromq。
Protocol Buffer 是 Google 的数据交换的格式,与 protobuf 类似的东西其实是 json 和 xml,protobuff 的优势在于更小的体积,这样在大量数据传输的时候节省了带宽资源。与 json 和 xml 所不同的是,protobuff 自带了一个编译器,protoc,只需要用它进行编译,可以编译成 JAVA、python、C++ 代码,简单来说,它可以生成对应语言的数据类型,比如说生成 java 的一个类等等。
由于 stf 是 node 语言,这里重点介绍 node 中 protobuf 的使用,protobuf 使用前需要先编写一个 proto 文件,定义消息类型,举个 protobuf.js 的例子:
// user.proto
package user;
syntax = "proto2";
message username {
string username_field = 1;
}
//user.js
var protobuf = require("protobufjs");
protobuf.load("user.proto", function(err, root) {
if (err) throw err;
// Obtain a message type
var user = root.lookup("user.username");
// Create a new message
var message = user.create({ usernamefield: "Tom" });
// Encode a message
var buffer = user.encode(message).finish();
// ... do something with buffer
// Or, encode a plain object
var buffer = user.encode({ usernamefield: "jerry" }).finish();
// ... do something with buffer
// Decode a buffer
var message = user.decode(buffer);
// ... do something with message
console.log(message)
// If your application uses length-delimited buffers, there is also encodeDelimited and decodeDelimited.
});
上面的代码是异步的,有时候异步不是很好用,我们可以改成同步的。在 STF 中用的就是同步模式,有兴趣的同学可以详细看一下,在 lib\wire 文件夹下,说实话,关于 protobuf 我也没搞太清楚,不过对于 STF 的改造已经够用了。
前面关于 zmq 和 protobuffer 讲了那么多,可能很多人已经看晕了,讲这些并不是故弄玄虚,也不是为了显得 stf 多么高深,而是因为不把这些知识深入的搞清楚,根本无法理解 stf 的消息机制,更无法利用消息进行扩展或者与外部交互。
下面说一下 stf 消息的两个应用:扩展节点以及对外发布设备状态。
首先看下面的一张图,这是 STF 官方部署文档中一张结构图,刚接触时,我就知道这张图比较重要,但是看了很长时间也没看出所以然,直到把 zmq 搞懂,才基本有所理解了。
这是最常见的形式了,很多手机并不是完全插在一台 provider(电脑)上,多台电脑就是 provider 的扩展。从上图中可以看出,provider 有两个接口,push 和 sub,从前文中可以知道,push 可以保证消息被可靠的推送成功,而且 sub 用来监听自己感兴趣的消息,对应的实际中,用户点击使用按钮时,发出一个 GroupMessage(占用手机)命令,provider 通过 sub 端收到这个消息后,执行一系列操作,然后通过 push 方式把占用成功的消息推送出去,很明显,占用广播的消息即使没有 provider 回应也没有关系,因为这时候表示手机占用失败,我们用 stf 时偶尔就会出现这个问题,但是占用成功一定要保证 push 成功,否则下一个再占用会造成冲突。
图中 provider 上面的 dev 我认为就是手机,手机上的 STFservice 可以 push 和 pull 数据,但是 STFsevice 是如何联网的我还不太清楚。很显然,一个 provider 上可以同时插多台手机。
从图中可以看出,在进行 provider 扩展的时候,每个 provider 只要连上 dev-triproxy 上就行了,从 dev-triproxy push 和 sub 数据。下面举一个 provider 的启动命令的例子:
docker run --rm \
--name provider \
--net host \
openstf/stf:latest \
stf provider \
--name provider1 \
--connect-sub tcp://devside.stf.example.org:7250 \
--connect-push tcp://devside.stf.example.org:7270 \
--storage-url http://stf.example.org/ \
--public-ip local_ip \
--min-port=15000 \
--max-port=25000 \
--heartbeat-interval 10000 \
--screen-ws-url-pattern "ws://stf.example.org/d/provider1/<%= serial %>/<%= publicPort %>/"
从命令中可以看出,在 provider 中需要提供几个主要参数:
在扩展 provider 时,只要更改一下 provider 的 ip 和名称,就可以同时上线多个 provider。
websocket 节点同样有 push 和 sub 两个端口,分别用来推送和接收消息,推送的主要消息是用户占用和取消占用的消息,接收的消息主要是设备被占用和设备改变的消息。
下面看一下 websocket 节点的启动命令:
docker run --rm \
--name %p-%i \
--link rethinkdb-proxy-28015:rethinkdb \
-e "SECRET=YOUR_SESSION_SECRET_HERE" \
-p %i:3000 \
openstf/stf:latest \
stf websocket --port 3000 \
--storage-url https://stf.example.org/ \
--connect-sub tcp://appside.stf.example.org:7150 \
--connect-push tcp://appside.stf.example.org:7170
首先解释一下命令中的各个参数,%p 表示模块的名称,%i 表示端口号,在实际应用的需要用对应的参数替换。参数里面有一个 link rethinkdb 的参数,是因为 websocket 模块有些功能需要直接读写数据库。扩展 websocket 节点的时候,如果在同一台电脑,只要修改一下%i 这个端口号就行了,因为同一个系统的两个进程不能监听同一端口,当然,如果没有用 docker,需要修改-p 3000。
websocket 有多个监听端口,怎样用这些端口呢,需要在 nginx 里配置一下:
...
upstream stf_api {
server 192.168.255.100:3700 max_fails=0;
server 192.168.255.100:3701 max_fails=0;
server 192.168.255.101:3700 max_fails=0;
}
...
processor 节点的扩展就比较简单了。仿照前面 websocket 的扩展,直接启动多个 processerr 模块就可以了。processor 模块扩展一般用于 processor 成为系统的瓶颈的情况下,不过目前为止,我还没有发现 processor 需要扩展的情况,一般都能处理过来。
官方的框图中表示 triproxy 节点可以进行扩展,但是我实在找不到扩展的办法,还请懂行的人指导!
STF 可以提供手机屏幕的实时图像以及实时操作功能,很多时候我们的手机不仅可以提供给别人使用,也会用来做自动化等事情,当手机做自动化时,肯定不希望别人来操作,当有人在使用这台手机时,也不允许做自动化,这时候自动化工具肯定希望知道设备的状态。对于这种需求,一种方式是提供查询的接口,其他应用通过接口查询手机的状态,但是如果有很多应用同时调接口,会给 STF 造成很大压力,同时也有些浪费。
根据前文介绍的 STF 消息机制,我们可以利用 zmq 给 STF 做一个广播模块,实时广播设备的状态改变,其他应用监听到设备的改变后再做对应的操作。事实上,provider 一直在广播设备的改变状态,如果我们用 subber.js 直接收听 7150 端口的数据,我们可以看到设备的改变信息。
广播的格式是 protobuffer,这里说一下 protobuffer 的一些问题,虽然 protobuffer 具更高的数据传输效率,但是同时牺牲了很多灵活性,如果别人想要收听这个广播,还必须拿到一个完整的 proto 文件,然后生成相关的类,如果广播的信息有所增删,则需要重新拉最新的 proto 文件,这给收听都带来很多不便。因此,我们可以利用 STF 的模块再做一次转换,变成普通的 json,这样就灵活多了。