上一篇文章分析 STF 如何同步设备截图的,这一篇分析用户在前端页面 touch 设备 A 时,STF 如何将 touch 动作同步在设备 A?本文将从前端,服务端,手机 三个方向说明。
touch 的整个流程如下所示:
其中 triproxy/index.js 从 pull 到 pub 用虚线标识出来原因是,中间省略了一系列过程。这些过程会在服务端部分详细说明。
当用户 touch 设备时,前端如何捕捉用户动作?捕捉到用户动作之后,是如何将数据传输至服务端?
这里说明一下,下文中所有设备详情页指的是http://localhost:7100/#!/control/${serial}
页面,
设备详情页中看到设备截图的那个部分,如下图所示
是由 screen-directive.js 文件管理的。screen-directive.js 中除了之前说的渲染图片二进制文件功能,还可以捕捉用户点击,滑动 “设备屏幕” 等事件,并调用 ControlService.js 文件中对应的函数。代码如下所示
element.on('mousedown', mouseDownListener) //当用户触发mousedown事件后,调用mouseDownListener函数
function mouseDownListener(){
...
...
control.touchDown(nextSeq(), 0, scaled.xP, scaled.yP, pressure)
...
$document.bind('mouseup', mouseUpListener)
}
当用户在前端点击设备上某个位置时,screen-directive.js 捕捉了 touchDown 动作,调用的是 mouseDownListener 函数, mouseDownListener 会通过$scope.control
(即代码中的 control) 调用 ControlService.js 的中 touchDown 函数,向服务端发送消息 -- 点击该台设备的 (x,y) 位置。
至于为什么在 control-panes-controller.js 中定义的$scope.control
, 能在 screen-directive.js 使用?这就属于 angular 1.x 知识,感兴趣的同学可以自己去了解,这里不赘述。
该 js 文件完成$scope.device
,和$scope.control
的初始化
...
function getDevice(serial) {
DeviceService.get(serial, $scope)
.then(function(device) {
return GroupService.invite(device)
})
.then(function(device) {
$scope.device = device
$scope.control = ControlService.create(device, device.channel)
// TODO: Change title, flickers too much on Chrome
// $rootScope.pageTitle = device.name
SettingsService.set('lastUsedDevice', serial)
return device
})
.catch(function() {
$timeout(function() {
$location.path('/')
})
})
}
...
其中:
$scope.control = ControlService.create(device, device.channel)
我们来分析 ControlService.js 文件
function sendOneWay(action, data) {
socket.emit(action, channel, data)
}
...
...
this.touchDown = function(seq, contact, x, y, pressure) {
sendOneWay('input.touchDown', {
seq: seq
, contact: contact
, x: x
, y: y
, pressure: pressure
})
}
...
ControlService.js 中封装了一系列操作设备的函数:如: this.touchDown。但这些操作设备函数都调用了 sendOneWay 方法。sendOneWay 方法调用了 socket.emit(),而这个 socket 来自 socket-service.js 文件。这个文件主要内容如下所示
var io = require('socket.io')
var websocketUrl = AppState.config.websocketUrl || ''
var socket = io(websocketUrl, {
reconnection: false, transports: ['websocket']
})
...
这段代码创建了 websocket client(websocket 在之前的文章已经描述过了,这里不再赘述)。ControlService.js 文件使用这个 socket 向服务端发送消息。
总结写一下,this.touchDown
函数,通过socket.emit
向服务端发送'input.touchDown'消息,并传输该设备的 channel 和按压点的 x,y 坐标。
需要注意的是,设备 A 详情页和设备 B 详情页,var websocketUrl = AppState.config.websocketUrl || ''
不同 (端口不同)。
请先仔细阅读zeromq 官方文档,zeromq 其他参考资料。着重看 push/pull,pub/sub 部分。
服务端是如何接收前端发送的数据,并将数据传输至对应手机呢?这里以单个设备举例。
服务端处理信息流转有三个文件,websocket/index.js
, triproxy/index.js
, processor/index.js
。最后接收数据和 minitouch 通信是来自 device 的 sub.js,touch/index.js
。
在执行stf local
命令时,发现 STF 启动了两个 triproxy 的实例 app001 和 dev001
INF/util:procutil 40934 [*] Forking "/Users/sheranjun/Code/stf/lib/cli.js triproxy app001 --bind-pub tcp://127.0.0.1:7111 --bind-dealer tcp://127.0.0.1:7112 --bind-pull tcp://127.0.0.1:7113"
INF/util:procutil 40934 [*] Forking "/Users/sheranjun/Code/stf/lib/cli.js triproxy dev001 --bind-pub tcp://127.0.0.1:7114 --bind-dealer tcp://127.0.0.1:7115 --bind-pull tcp://127.0.0.1:7116"
其中 app001 是处理从前端 UI push 的消息;dev001 是处理从手机 push 的消息。这两个实例一起完成了前端 UI 和手机的双向流通。当然本文只分析从前端 UI 发送消息到手机接收消息单方向的流程
index.js 做了两件事,启动 websocket 服务端,等待接收 websocket 客户端消息;启动 app001 push,发送消息至 app001 pull
var socketio = require('socket.io')
var io = socketio.listen(server, {
serveClient: false
, transports: ['websocket']
})
....
server.listen(options.port)
log.info('Listening on port %d', options.port)
设备 A 详情页和设备 B 详情页启动的是不同的 websocket server.
var push = zmqutil.socket('push')
log.info("endopoints is " + options.endpoints)
log.info("endopoints is push " + options.endpoints.push)
Promise.map(options.endpoints.push, function(endpoint) {
return srv.resolve(endpoint).then(function(records) {
return srv.attempt(records, function(record) {
log.info('Sending output to "%s"', record.url)
push.connect(record.url)
return Promise.resolve(true)
})
})
})
.catch(function(err) {
log.fatal('Unable to connect to push endpoint', err)
lifecycle.fatal()
})
其中push.connect(record.url)
, record.url
是 app001 pull 服务的 URL。是唯一的,所有的设备详情页使用的是同一个 app001 pull 服务的 URL。
io.on('connection', function(socket) {
socket.on('input.touchDown', function(channel, data){
log.info("touch down from websocket")
push.send([
channel
, wireutil.envelope(new wire.TouchDownMessage(
data.seq
, data.contact
, data.x
, data.y
, data.pressure
))
])
})
}
这段语句,将 websocket 和 push 联系起来,先来看看前端发送的input.touchDown
消息,传输的数据
function sendOneWay(action, data) {
socket.emit(action, channel, data)
}
...
...
this.touchDown = function(seq, contact, x, y, pressure) {
sendOneWay('input.touchDown', {
seq: seq
, contact: contact
, x: x
, y: y
, pressure: pressure
})
}
...
两端代码联系起来,前端发送input.touchDown
消息,后端成功接收input.touchDown
消息之后,调用 app001 的 push 端,将 channel 和封装了 data(封装方式这里不重要,不讨论),发送至 app 001 的 pull 端。
同时可以知道,尽管设备 A 和设备 B 详情页使用的不同的 websocket,但它们使用的 push 端相同。
triproxy 中的 index.js,主要是新建 pull 端,dealer 端,以及 pub 端。代码如下所示
function proxy(to) {
return function() {
to.send([].slice.call(arguments))
}
}
// App/device output
var pub = zmqutil.socket('pub')
pub.bindSync(options.endpoints.pub)
log.info('PUB socket bound on', options.endpoints.pub)
// Coordinator input/output
var dealer = zmqutil.socket('dealer')
dealer.bindSync(options.endpoints.dealer)
dealer.on('message', proxy(pub))
log.info('DEALER socket bound on', options.endpoints.dealer)
// App/device input
var pull = zmqutil.socket('pull')
pull.bindSync(options.endpoints.pull)
pull.on('message', proxy(dealer))
log.info('PULL socket bound on', options.endpoints.pull)
在启动 STF 时,新建了两个 triproxy:app001,dev001。这里拿 app001 triproxy 来举例,
pull.on('message', proxy(dealer))
,意味着 pull 端接受来自 websocket/index.js push 端的 channel/data 后,将 channel/data 通过 dealer 服务端发送出去。dealer.on('message', proxy(pub))
,意思是 dealer 服务端接收到 channel/data 后,将 channel/data 通过 pub 端发送出去。前面提到过,STF 新建了两个 triproxy:app001 和 dev 001。所以,websocket/index.js 中的 channel/data 通过 app001 push 到了 app001 的 pull 端。app001pull 端,将 channel/data 从 app001 dealer 服务端发送出去,那 channel/data 又在哪里被接收了呢?
processor 中的 index.js,主要是为了 app001 dealer client 和 dev001 dealear client 之间的消息转发。代码如下
// App side
var appDealer = zmqutil.socket('dealer')
Promise.map(options.endpoints.appDealer, function(endpoint) {
return srv.resolve(endpoint).then(function(records) {
return srv.attempt(records, function(record) {
log.info('App dealer connected to "%s"', record.url)
appDealer.connect(record.url)
return Promise.resolve(true)
})
})
})
.catch(function(err) {
log.fatal('Unable to connect to app dealer endpoint', err)
lifecycle.fatal()
})
// Device side
var devDealer = zmqutil.socket('dealer')
appDealer.on('message', function(channel, data) {
devDealer.send([channel, data])
})
Promise.map(options.endpoints.devDealer, function(endpoint) {
return srv.resolve(endpoint).then(function(records) {
return srv.attempt(records, function(record) {
log.info('Device dealer connected to "%s"', record.url)
devDealer.connect(record.url)
return Promise.resolve(true)
})
})
})
.catch(function(err) {
log.fatal('Unable to connect to dev dealer endpoint', err)
lifecycle.fatal()
})
processor 中的 index.js 中新建了两个 dealer client:appDealer client 和 devDealer client。分别连接 triproxy 中,app001 dealer 服务端和 dev001 dealer 服务端。其中
appDealer.on('message', function(channel, data) {
devDealer.send([channel, data])
})
appDealer client 接收到来自 app001 dealer 服务端的 channel/data 之后,将 channel/data 传递给 devDealer client,devDealer client 发送至 dev001 dealer 服务端。再联系triproxy/index.js
一小节的流程,dev001 的 dealer 服务端接收到 channel/data 后,将 channel/data 通过 dev001 的 pub 端发送出去。
至此服务端 channel/data 流转前半截就将完了。接下来将 dev001 的 pub 端的 channel/data 流向哪里了?
每一个手机都会创造一个 device 对象。device 对象中包含很多功能。和本篇文章相关的是两个功能,一是 dev001 sub 端并订阅固定 channel。这个功能由 sub.js 和 solo.js 两个文件完成;二是创造一个 touch 对象,STF 用它来向手机发送指令,这个功能由 touch/index.js 文件完成。如下图所示:
至于 device 对象何时创造?如何被创建?这个流程和 lib/cli.js,/lib/provider/index.js 文件有关。关键词是fork
以及.command('device <serial>')
,有兴趣自己研究。
sub.js
为每个手机创建连接到 dev001 pub 端的 sub 端。代码:
var sub = zmqutil.socket('sub')
return Promise.map(options.endpoints.sub, function(endpoint) {
return srv.resolve(endpoint).then(function(records) {
return srv.attempt(records, function(record) {
log.info('Receiving input from "%s"', record.url)
sub.connect(record.url)
return Promise.resolve(true)
})
})
})
.then(function() {
// Establish always-on channels
[wireutil.global].forEach(function(channel) {
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
})
})
.return(sub)
这里的sub.connect(record.url)
,record.url
是 dev001 pub 端,这一点可以通过 stf 启动日志验证。
STF 日志中 dev001 pub 端口地址如下所示:
INF/util:procutil 47526 [*] Forking "/Users/****/Code/stf/lib/cli.js triproxy dev001 --bind-pub tcp://127.0.0.1:7114 --bind-dealer tcp://127.0.0.1:7115 --bind-pull tcp://127.0.0.1:7116"
而 STF 日志中 sub.js 打印出的日志:
INF/device:support:sub 48054 [63a5b447] Receiving input from "tcp://127.0.0.1:7114"
solo.js
solo.js,为 sub.js 中创建的 sub 端,订阅该手机的固定频道。代码如下
function makeChannelId() {
var hash = crypto.createHash('sha1')
hash.update(options.serial)
return hash.digest('base64')
}
var channel = makeChannelId()
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
solo.js 通过手机的 serial 创建 channel。可能会有人问,怎么确定 dev001 pub 端发送的 channel(本质上来自前端手机 A 详情页传输的 channel),和该手机在 solo.js 中订阅的 channel 是同一个呢?这里简单解释一下。前端传递过来的 channel,也来自于后端。前端获取 channel 的代码在 device-service.js 文件中,代码如下所示:
deviceService.load = function(serial) {
return $http.get('/api/v1/devices/' + serial)
.then(function(response) {
return response.data.device
})
}
请求的 api,最终执行的后端函数如下所示:
function getDeviceBySerial(req, res) {
var serial = req.swagger.params.serial.value
var fields = req.swagger.params.fields.value
dbapi.loadDevice(serial)
.then(function(device) {
...
}
其中dbapi.loadDevice
是操作数据库函数,而对 device 对象的一系列数据库操作中,只有setDeviceReady
函数是更新了 device 的 channel,所以dbapi.loadDevice
中 channel 数据来自于setDeviceReady
函数。
dbapi.setDeviceReady = function(serial, channel) {
return db.run(r.table('devices').get(serial).update({
channel: channel
, ready: true
, owner: null
, reverseForwards: []
}))
}
而dbapi.setDeviceReady
函数又在哪里被调用呢?是的,在 processor/index.js。
devDealer.on('message', wirerouter()
.on(wire.DeviceReadyMessage, function(channel, message, data) {
dbapi.setDeviceReady(message.serial, message.channel)
.then(function() {
devDealer.send([
message.channel
, wireutil.envelope(new wire.ProbeMessage())
])
appDealer.send([channel, data])
})
})
而 processor/index.js 接收的 channel/message/data,来自 solo.js。至于 solo.js 的 push,如何将 channel/message/data 传输至 processor/index.js 的 devDealer client 中,按照上一小节的 triproxy.js 的流程图,对照 STF 日志,自己分析。这里不再赘述。
return {
channel: channel
, poke: function() {
push.send([
wireutil.global
, wireutil.envelope(new wire.DeviceReadyMessage(
options.serial
, channel
))
])
}
}
所以,前端的 channel,也是来自于 solo.js,而且 channel 和 serial 有一一对应的关系。这就解决了来自前端手机 A 详情页的数据一定会发送到后端手机 A 的对应的 channel 上。
touch/index.js
touch/index.js
中做了两件事:一是启动手机的 minitouch 服务,而是订阅 dev001 pub 端的数据,并将固定频道 channel 数据传输至手机。每一个手机都会有一个 touch/index.js
启动手机 minitouch 服务
TouchConsumer.prototype._startService = function() {
log.info('Launching screen service')
return minitouch.run()
.timeout(10000)
}
adb.openLocal
函数完成,代码如下所示TouchConsumer.prototype._connectService = function() {
function tryConnect(times, delay) {
return adb.openLocal(options.serial, 'localabstract:minitouch')
.timeout(10000)
.then(function(out) {
return out
})
.catch(function(err) {
if (/closed/.test(err.message) && times > 1) {
return Promise.delay(delay)
.then(function() {
return tryConnect(times - 1, delay * 2)
})
}
return Promise.reject(err)
})
}
log.info('Connecting to minitouch service')
// SH-03G can be very slow to start sometimes. Make sure we try long
// enough.
return tryConnect(7, 100)
}
有兴趣的,可以研究一下adb.openLocal
函数,这里不再详细讨论该函数流程
订阅 dev001 pub 端固定 channel
touch/index.js
通过引用 sub.js 和 solo.js,使用订阅固定频道的 dev001 sub,等待来自 dev001 pub 端数据,代码如下所示:
router
.on(wire.TouchDownMessage, function(channel, message) {
log.info("touch down from toucb")
queue.push(message.seq, function() {
touchConsumer.touchDown(message)
})
})
而 router 来自 router.js 中,router.js 引用了 sub.js
module.exports = syrup.serial()
.dependency(require('./sub'))
.define(function(options, sub, channels) {
var log = logger.createLogger('device:support:router')
var router = wirerouter()
sub.on('message', router.handler())
return router
})
其中 dev001 push 端在 channelA 上发送数据后,订阅了 channelA 的 dev001 sub 端接收该数据。并调用touchConsumer.touchDown(message)
。而 touchDown 函数的相关代码如下所示:
TouchConsumer.prototype.touchDown = function(point) {
log.info("touch down from touch index")
this._queueWrite(function() {
return this._write(util.format(
'd %s %s %s %s\n'
, point.contact
, Math.floor(this.touchConfig.origin.x(point) * this.banner.maxX)
, Math.floor(this.touchConfig.origin.y(point) * this.banner.maxY)
, Math.floor((point.pressure || 0.5) * this.banner.maxPressure)
))
})
}
TouchConsumer.prototype._write = function(chunk) {
this.socket.stream.write(chunk)
}
touchDown
函数中的this._write
调用的this.socket
来自this._connectService()
返回值。this.socket
代表着连接手机 minitouch tcp 服务端的 tcp 客户端。touchDown
函数接收 touchDown 消息后,向手机发送以'd'开头的字符串命令。
到此,服务端消息的流程已经全部解析完成。再看看最开始画的流程图,是不是清晰很多呢?
手机上主要是启动了 minitouch 的 tcp 服务,接收 STF 服务端操作手机指令。并根据指令,操作手机它的代码如下所示
start_server 函数,启动 tcp 服务
static int start_server(char* sockname)
{
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
{
perror("creating socket");
return fd;
}
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(&addr.sun_path[1], sockname, strlen(sockname));
if (bind(fd, (struct sockaddr*) &addr,
sizeof(sa_family_t) + strlen(sockname) + 1) < 0)
{
perror("binding socket");
close(fd);
return -1;
}
listen(fd, 1);
return fd;
}
fd
既是创造 TCP 服务端
等到 STF 服务端发送的指令,根据指令操作设备,这里以 touchDown 指令,举例说明
int client_fd = accept(server_fd, (struct sockaddr *) &client_addr,
&client_addr_length);
.....
while (io_length < sizeof(io_buffer) &&
read(client_fd, &io_buffer[io_length], 1) == 1)
{
if (io_buffer[io_length++] == '\n')
{
break;
}
}
...
switch (io_buffer[0])
{
case 'c': // COMMIT
commit(&state);
break;
case 'r': // RESET
touch_panic_reset_all(&state);
break;
case 'd': // TOUCH DOWN
contact = strtol(cursor, &cursor, 10);
x = strtol(cursor, &cursor, 10);
y = strtol(cursor, &cursor, 10);
pressure = strtol(cursor, &cursor, 10);
touch_down(&state, contact, x, y, pressure);
break;
.....
}
其中client_fd
接收来自 STF 服务端的指令,io_buffer
存储 STF 服务端消息,并根据第一字符判断操作类型。若第一个字符为 d,调用touch_down
函数。
touch 动作源码分析,整理出来算是一个大的工程了。涉及的核心知识点是 zeromq,这里只分析了前端操作是如何同步到设备,逆向流程并没有涉及,有兴趣的可以自己了解。
由于项目需要改造 STF,阅读了 STF 的源码,但项目改造 STF 重点在 STF 的前端和 STF 认证。这两块没有涉及多少 STF 整体架构设计,更多的是 angular 1.x 框架的知识。对这两块有兴趣的,欢迎讨论。