STF 从 zmq 和 protobuf 谈 STF 中的消息传递

blueshark · May 05, 2017 · Last by zhanglimin replied at January 16, 2019 · 4340 hits
本帖已被设为精华帖!

一、前言

用过STF的都知道,只要用户点击使用按钮,这台手机就会被标记为占用状态,其他用户在设备列表立即就可以看到某人使用了手机,同时其他用户也不再能使用这台手机,这种即时的消息肯定不能靠接口等传统方式进行传递。事实上,在STF中,很多信息都是通过消息来传递的,这其中用到了很多工具,比如说zeromq和protobuf。下面根据我的理解讲一下STF中的消息传递过程。

二、zeromq和protobuf基础

2.1 zeromq介绍和使用

zmq号称史上最快的消息队列,当然,快是以牺牲其他方面的性能为代价的。首先看下消息队列的定义:

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

打个比方,人与人交流时可以通过讲话来完成,一个大型系统各个模块之间的通信就需要消息队列来完成了。再以STF以例,当某个设备上线后,provider可以通过消息告诉在线的用户这台设备处于可用状态,当某个人使用了设备以后,需要通过消息告诉其他人这台手机牌繁忙状态,同时通知手机开始传输屏幕图像,再通知数据库改变设备的使用状态。

当然,有很多开源的消息队列工具可以使用,zmq也不是最流行的一个。其他常用消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等等。

zmq也是基于socket接口进行通信的。zmq与Socket 的区别是普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M的关系。就是多个服务端可以同时向多个客户端发消息,同时会自动处理错误等细节。

2.1.1 zmq的模式

zmq具有多种工作模式:

  • request-reply
  • push-pull
  • publish-subscribe
  • dealer-router

这些模式适用于不同的场景,在STF中用到了:push-pull,publish-subscribe,dealer-router这三种模式。

push-pull:

push/pull是单向模式,消息只能由push端发出,由pull端进行拉取。一般来说pull端对消息进行处理,如果一个pull端不能及时处理,可以同时有多个pull端,这种情况下,一条消息只能被 一个pull端拉取,拉过之后其他pull端就不能再次拉取。如果没有pull端拉取,消息过多的时候可能会溢出。

下面是node的示例代码:

// producer.js,push端
var zmq = require('zeromq')
, sock = zmq.socket('push');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');

var count = 0
setInterval(function(){
var message = "some work"+(count++);
console.log(message);
sock.send(message);
}, 2000);
// worker.js,pull端
var zmq = require('zeromq')
, sock = zmq.socket('pull');

sock.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');

sock.on('message', function(msg){
console.log('work: %s', msg.toString());
});

worker.js代码可以同时运行多个。一个worker pull过以后其他worker不会再pull到相同的消息。

在stf中,push-pull模式可用于用户和provider之前的消息处理,因为一条消息只需要处理一次,并且可以有多个处理端,即processor。

publish-subscribe:

这属于发布订阅模式。与push-pull所不同的,pub会向所有已经连接的sub发消息,如果没有sub连接,消息会被丢弃。简单来说pub-sub就是一个像大喇叭一样的广播系统,如果此时没有听到广播,后面就不会听到了。

// pubber.js
var zmq = require('zeromq')
, sock = zmq.socket('pub');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');

setInterval(function(){
console.log('sending a multipart message envelope');
sock.send(['kitty', 'kitty meow!']);
sock.send(['cats', 'cats meow!']);
}, 2000);
// subber.js
var zmq = require('zeromq')
, sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:3000');
sock.subscribe('kitty');
sock.subscribe('cats');
console.log('Subscriber connected to port 3000');

sock.on('message', function(topic, message) {
//console.log('received a message related to:', topic, 'containing message:', message);
console.log('received a message related to:', topic.toString(), 'containing message:', message.toString());
});

subber.js可以同时运行多个,每个subber都会收到相同的消息。pub和sub还可以订阅特定的关键字。比如说如果sub只订阅了cats关键字,只会收到cats meow!消息,也可以同时订阅多个关键字。

在STF中,pub-sub模式用于广播设备的变更信息,比较如某个手机上线和下线,需要通知到所有的用户还有数据库。

dealer-router:

dealer/router是路由模式,适用于有多个发送端和多个接收端的情况,这样可以实现负载均衡。

在stf中,同时有多个用户和多台手机在线,dealer-router很适用于这种情况下的消息传递。

这种模式还没理解太透,等理解透了把代码补上。

2.1.2 zmq支持的语言

zmq最开始由c/c++编写的,但是现在已经支持java、node、python等语言

node版的zmq前文已经做了demo,下面举一个java subber的例子:

import com.alibaba.fastjson.JSONObject;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

public class Zmq_Thread {
public void start(String url,String subscription) {
ZContext context = new ZContext();
ZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);
if (url != null) {
subscriber.connect(url);
}
else {
subscriber.connect("tcp://127.0.0.1:7350");
}

if (subscription == null){
subscription = "test";
}

subscriber.subscribe(subscription.getBytes(ZMQ.CHARSET));

while (true) {
String topic = subscriber.recvStr();
if (topic == null)
break;
String data = subscriber.recvStr();
assert(topic.equals(subscription));
JSONObject jsonObject = JSONObject.parseObject(data) ;
System.out.println(jsonObject.toJSONString());
}
context.destroy();
}
}

关于java的其他模式可以参考官方demo或者网上相关教程。这里举java subber的原因是很多公司的项目是采用java的,stf与其他项目结合是需要用到jeromq。

2.2 protobuf的使用

Protocol Buffer是Google的数据交换的格式,与protobuf类似的东西其实是json和xml,protobuff的优势在于更小的体积,这样在大量数据传输的时候节省了带宽资源。与json和xml所不同的是,protobuff自带了一个编译器,protoc,只需要用它进行编译,可以编译成JAVA、python、C++代码,简单来说,它可以生成对应语言的数据类型,比如说生成java的一个类等等。

由于stf是node语言,这里重点介绍node中protobuf的使用,protobuf使用前需要先编写一个proto文件,定义消息类型,举个protobuf.js的例子:

// user.proto
package user;
syntax = "proto2";

message username {
string username_field = 1;
}
//user.js
var protobuf = require("protobufjs");

protobuf.load("user.proto", function(err, root) {
if (err) throw err;

// Obtain a message type
var user = root.lookup("user.username");

// Create a new message
var message = user.create({ usernamefield: "Tom" });

// Encode a message
var buffer = user.encode(message).finish();
// ... do something with buffer

// Or, encode a plain object
var buffer = user.encode({ usernamefield: "jerry" }).finish();
// ... do something with buffer

// Decode a buffer
var message = user.decode(buffer);
// ... do something with message
console.log(message)

// If your application uses length-delimited buffers, there is also encodeDelimited and decodeDelimited.
});

上面的代码是异步的,有时候异步不是很好用,我们可以改成同步的。在STF中用的就是同步模式,有兴趣的同学可以详细看一下,在lib\wire文件夹下,说实话,关于protobuf我也没搞太清楚,不过对于STF的改造已经够用了。

三、利用zmq和protobuf增强STF的性能

前面关于zmq和protobuffer讲了那么多,可能很多人已经看晕了,讲这些并不是故弄玄虚,也不是为了显得stf多么高深,而是因为不把这些知识深入的搞清楚,根本无法理解stf的消息机制,更无法利用消息进行扩展或者与外部交互。

下面说一下stf消息的两个应用:扩展节点以及对外发布设备状态。

首先看下面的一张图,这是STF官方部署文档中一张结构图,刚接触时,我就知道这张图比较重要,但是看了很长时间也没看出所以然,直到把zmq搞懂,才基本有所理解了。

STF

3.1. provider节点的扩展。

这是最常见的形式了,很多手机并不是完全插在一台provider(电脑)上,多台电脑就是provider的扩展。从上图中可以看出,provider有两个接口,push和sub,从前文中可以知道,push可以保证消息被可靠的推送成功,而且sub用来监听自己感兴趣的消息,对应的实际中,用户点击使用按钮时,发出一个GroupMessage(占用手机)命令,provider通过sub端收到这个消息后,执行一系列操作,然后通过push方式把占用成功的消息推送出去,很明显,占用广播的消息即使没有provider回应也没有关系,因为这时候表示手机占用失败,我们用stf时偶尔就会出现这个问题,但是占用成功一定要保证push成功,否则下一个再占用会造成冲突。

图中provider上面的dev我认为就是手机,手机上的STFservice可以push和pull数据,但是STFsevice是如何联网的我还不太清楚。很显然,一个provider上可以同时插多台手机。

从图中可以看出,在进行provider扩展的时候,每个provider只要连上dev-triproxy上就行了,从dev-triproxy push和sub数据。下面举一个provider的启动命令的例子:

docker run --rm \
--name provider \
--net host \
openstf/stf:latest \
stf provider \
--name provider1 \
--connect-sub tcp://devside.stf.example.org:7250 \
--connect-push tcp://devside.stf.example.org:7270 \
--storage-url http://stf.example.org/ \
--public-ip local_ip \
--min-port=15000 \
--max-port=25000 \
--heartbeat-interval 10000 \
--screen-ws-url-pattern "ws://stf.example.org/d/provider1/<%= serial %>/<%= publicPort %>/"

从命令中可以看出,在provider中需要提供几个主要参数:

  • name 这是在stf手机列表中显示的provider的名字。
  • connect-sub 就是triproxy的pub的端口。
  • connect-push 是triproxy的pull的端口。
  • storage-url 一般就是stf主页的链接,也可以是自己配置的storage-url,在storage模块中有设置。
  • public-ip 是指provider所在的电脑的ip,这个ip会作为minicap向外传送图像的地址。
  • min-port、max-port是指每部手机向外传输图像的端口,因为一个provider可以连多台手机,每台手机传输图像的端口不一样。
  • heartbeat-interval 是指心跳时间,为了保证provider可用,每隔一段时间provider会发出一个heartbeat,reaper接收这个heartbeat,如果reaper在自己的超时时间内没有收到provider的heartbeat,会认为这个provider下线。
  • screen-ws-url-pattern是指屏幕传输图像的url类型,其中serial是指手机的串号,publicPort是min-port和max-port之前的一个,这样就能唯一确定一个图像传输的url。

在扩展provider时,只要更改一下provider的ip和名称,就可以同时上线多个provider。

3.2.websocket节点的扩展

websocket节点同样有push和sub两个端口,分别用来推送和接收消息,推送的主要消息是用户占用和取消占用的消息,接收的消息主要是设备被占用和设备改变的消息。

下面看一下websocket节点的启动命令:

docker run --rm \
--name %p-%i \
--link rethinkdb-proxy-28015:rethinkdb \
-e "SECRET=YOUR_SESSION_SECRET_HERE" \
-p %i:3000 \
openstf/stf:latest \
stf websocket --port 3000 \
--storage-url https://stf.example.org/ \
--connect-sub tcp://appside.stf.example.org:7150 \
--connect-push tcp://appside.stf.example.org:7170

首先解释一下命令中的各个参数,%p表示模块的名称,%i表示端口号,在实际应用的需要用对应的参数替换。参数里面有一个link rethinkdb的参数,是因为websocket模块有些功能需要直接读写数据库。扩展websocket节点的时候,如果在同一台电脑,只要修改一下%i这个端口号就行了,因为同一个系统的两个进程不能监听同一端口,当然,如果没有用docker,需要修改-p 3000。

websocket有多个监听端口,怎样用这些端口呢,需要在nginx里配置一下:

...
upstream stf_api {
server 192.168.255.100:3700 max_fails=0;
server 192.168.255.100:3701 max_fails=0;
server 192.168.255.101:3700 max_fails=0;
}
...

3.3.processor节点的扩展

processor节点的扩展就比较简单了。仿照前面websocket的扩展,直接启动多个processerr模块就可以了。processor模块扩展一般用于processor成为系统的瓶颈的情况下,不过目前为止,我还没有发现processor需要扩展的情况,一般都能处理过来。

3.4.triproxy节点的扩展

官方的框图中表示triproxy节点可以进行扩展,但是我实在找不到扩展的办法,还请懂行的人指导!

3.5. 设备状态的广播。

STF可以提供手机屏幕的实时图像以及实时操作功能,很多时候我们的手机不仅可以提供给别人使用,也会用来做自动化等事情,当手机做自动化时,肯定不希望别人来操作,当有人在使用这台手机时,也不允许做自动化,这时候自动化工具肯定希望知道设备的状态。对于这种需求,一种方式是提供查询的接口,其他应用通过接口查询手机的状态,但是如果有很多应用同时调接口,会给STF造成很大压力,同时也有些浪费。

根据前文介绍的STF消息机制,我们可以利用zmq给STF做一个广播模块,实时广播设备的状态改变,其他应用监听到设备的改变后再做对应的操作。事实上,provider一直在广播设备的改变状态,如果我们用subber.js直接收听7150端口的数据,我们可以看到设备的改变信息。

广播的格式是protobuffer,这里说一下protobuffer的一些问题,虽然protobuffer具更高的数据传输效率,但是同时牺牲了很多灵活性,如果别人想要收听这个广播,还必须拿到一个完整的proto文件,然后生成相关的类,如果广播的信息有所增删,则需要重新拉最新的proto文件,这给收听都带来很多不便。因此,我们可以利用STF的模块再做一次转换,变成普通的json,这样就灵活多了。

共收到 4 条回复 时间 点赞
shuta STF 系列之二---minitouch 流程源码分析 中提及了此贴 04 Jun 17:24

太棒了,建议版主加精。解除了很多STF的疑惑,之前看那张结构图也是云里雾里,现在总算清晰了很多。

恒温 将本帖设为了精华贴 14 Jun 11:25

值得赏析

最近开始研究 stf ,翻看了楼主的系列文,觉得收获很大,留个言,谢谢楼主的分享~ 👍

楼主你好,我在文章看到这么一句话:在扩展provider时,只要更改一下provider的ip和名称,就可以同时上线多个provider。
这个是说stf服务启动的时候可以设置多个主机为provider吗。比如在A主机上运行stf服务,有两台都连接着手机的主机B和C(B和C主机都已经把adb端口打开),在启动A上STF时,把B和C的IP都要列出来是吧

需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up