websocket 模块在整个 STF 框架负责直接与用户的前端进行数据交互,我们知道,socket 的特点是可以双向通信,因此,当一个用户占用手机后,其他用户的前端显示手机忙碌这个信息就是通过 websocket 从后端传到前端的,而用户占用手机的信息,则是从前端传到后端的。此外,在屏幕上的点击和滑动操作,也是通过 websocket 传到后端的。既然是 websocket,就可以用浏览器调试看到内容,调试的方法是在浏览器的 network 调试窗口选择 WS 即可。
如果你遇到这个错误,基本上就是 websocket 模块的问题:
["device.change",{"important":false,"data":{"serial":"123456789","battery":{"status":"full","health":"good","source":"usb","level":100,"scale":100,"temp":35,"voltage":4.224}}}]
这是设备状态改变的一条消息,表示手机的电池状态有些改变。
["group.invite","ZTBTrJCtqjoiV1oHBNFELkryCXA=","tx.5c5241d3-7b7d-4493-9734-9498d859fc6a",{"requirements":{"serial":{"value":"123456789","match":"exact"}},"usage":null}]
这是占用手机的一条消息。
["input.touchMove","rjGnAV/sfyzGV/N+aaJhUNrePEU=",{"seq":35,"contact":0,"x":0.07175925925925926,"y":0.365234375,"pressure":0.5}]
这是用户在手机屏幕上操作的一条消息,表示用户划动的操作,还标示了用户划动的坐标信息。
从图中可以看出,websocket 包含 push 和 sub 两个端口,当然还包括与用户通信的 socket 的端口没有标示出来,其中 push 用来把用户的消息(例如占用手机)发送到后台,sub 用来接收后端的消息(例如手机被占用)。
从 STF 的官方文档可以看出,启动 websocket 的启动需要指定 port、--storage-url、--connect-sub、--connect-push
stf websocket --port 3000 \
--storage-url https://stf.example.org/ \
--connect-sub tcp://appside.stf.example.org:7150 \
--connect-push tcp://appside.stf.example.org:7170
这里 storage-url 是指存储的路径 url,主要指截图还有上传 apk 时的存储 url,connect-sub 就是监听消息的 triproxy 地址,connect-push 发送消息的 triproxy 地址,关于 triproxy 后续再专门介绍。
在 docker 的启动方式中 link 了数据库
--link rethinkdb-proxy-28015:rethinkdb
可以看出,websocket 模块指定的数据库,这表示 websocket 模块是与数据库有交互的。从源代码也可以看出,websocket 与数据库交互的内容主要包括设备的标记、用户的设置等内容,这也可以理解,这些内容通过 websocket 传过来以后就可以直接处理了,不需要再交线 processor 或者其他模块处理。
顺便提一下,从官方 docker 方式启动的脚本中有没有 link 数据库就可以看出这个模块有没有用到数据库,比如 stf-app 模块用到了数据库,而 provider 则没有。
之所以说简单分析是因为源码真的很多,不可能一句句都解释。
var push = zmqutil.socket('push')
......
var sub = zmqutil.socket('sub')
这两句话分别对应 connect-push 和 connect-sub,分别用来发送、接收消息和命令,详细可以看下 zmq 的相关知识和 zmqutil 的代码,再往下:
;[wireutil.global].forEach(function(channel) {
log.info('Subscribing to permanent channel "%s"', channel)
sub.subscribe(channel)
})
这句的意思是监听所有的全局命令字,在 zmq 中 sub 消息的时候可以指定特定命令字,而有一些命令字是全局的,需要一直监听,例如 provider 这个模块就不需要监听所有手机的命令,只需要监听自己连着的那些手机即可。
io.use(cookieSession({
name: options.ssid
, keys: [options.secret]
}))
io.use(ip({
trust: function() {
return true
}
}))
io.use(auth)
io.on('connection', function(socket) {
var req = socket.request
var user = req.user
var channels = []
user.ip = socket.handshake.query.uip || req.ip
socket.emit('socket.ip', user.ip)
function joinChannel(channel) {
channels.push(channel)
channelRouter.on(channel, messageListener)
sub.subscribe(channel)
}
function leaveChannel(channel) {
_.pull(channels, channel)
channelRouter.removeListener(channel, messageListener)
sub.unsubscribe(channel)
}
function createKeyHandler(Klass) {
return function(channel, data) {
push.send([
channel
, wireutil.envelope(new Klass(
data.key
))
])
}
}
这一大段是处理 websocket 的,简单来说就是处理用户浏览器与后端交互的。前面几个 io.use 和 http 服务器比较像,就是加上 cookies 之类的东西,而且实现都在 middleware 文件夹中。
先看 cookis-session.js 文件,其实很简单,就是加了个 session,这个就不再多解释,详情可以到 cookieSession 的官网看说明:https://www.npmjs.com/package/cookie-session
var session = cookieSession(options)
return function(socket, next) {
var req = socket.request
var res = Object.create(null)
session(req, res, next)
}
remote-ip.js 也比较简单,只是通过 request 拿到了 websocket 的 ip。proxyaddr 也可以在 npm 中找到,https://www.npmjs.com/package/proxy-addr
return function(socket, next) {
var req = socket.request
req.ip = proxyaddr(req, options.trust)
next()
}
auth.js 的关键代码如下,是通过 session 中的 jwt 信息拿到 email,然后通过查数据库保证用户合法,关于 jwt 会再开一篇文件详细介绍一下。
return dbapi.loadUser(token.email)
.then(function(user) {
if (user) {
req.user = user
next()
}
前面代码中的 joinChannel 和 leaveChannel 我的理解是监听和取消监听一个 zmq 通道,就是特定的命令字,但是更具体的用途还没搞清楚,createKeyHandler 这个方法也没搞清楚有什么用。
下面看关键一段:
var messageListener = wirerouter()
.on(wire.DeviceLogMessage, function(channel, message) {
socket.emit('device.log', message)
})
.on(wire.DeviceIntroductionMessage, function(channel, message) {
....
这是一个 zmq 消息监听的处理代码,这里的 on 是指收到特定消息的回调方法,其实很简单的,只要看几个典型的消息就知道了。比如说这一段:
.on(wire.JoinGroupMessage, function(channel, message) {
socket.emit('device.change', {
important: true
, data: datautil.applyOwner({
serial: message.serial
, owner: message.owner
, likelyLeaveReason: 'owner_change'
, usage: message.usage
}
, user
)
})
})
当 websocket 收到 JoinGroupMessage 消息时,会通过 socket 发送"device.change"命令给 web 前端,告诉所以在线的 web 前端这个设备已经被占用了,然后前端就会显示手机繁忙状态,在 data 中标示手机的串号、占用的用户以及用途等信息。这里 on 到消息都可以在 lib\wire\wire.proto 找到的。
下面看 socket on 这一段:
new Promise(function(resolve) {
socket.on('disconnect', resolve)
// Global messages for all clients using socket.io
//
// Device note
.on('device.note', function(data) {
return dbapi.setDeviceNote(data.serial, data.note)
.....
socket on 是处理用户前端发过的消息的,可以看到 websocket 收到用户消息后大部分通过 push.send 发送到了后台,就是 zmq 消息总线,然后让 provider 去处理。也有很多地方可以直接操作数据库,而不是交给 provider,比如说修改用户设置这类的消息。
在 socket on 中还有一些.on('input.touchMove...) 的内容,可以明显看出这是处理用户操作屏幕动作的。
中间还有一些处理异常的代码就不再细说了。
lifecycle.observe(function() {
[push, sub].forEach(function(sock) {
try {
sock.close()
}
catch (err) {
// No-op
}
})
})
这一段好像是处理错误情况的,具体还没搞懂,应该和 lifecycle 有很大关系,有谁明白的话麻烦给我说一下 _^
补充:根据@chenhengjie123的提示,这段内容的用途是在用户使用 CTRL+C 关闭 STF 模块时捕获这个关闭命令,从而做一些关闭 socket 连接等后续扫尾工作,实现友好关闭。每个模块都有这段代码。
总体来说呢,websocket 模块并不复杂,其实就是转发 provider 消息给前端,或者处理用户发过来的消息,只要知道了这两点儿,就基本理解了 websocket 模块。