一、websocket 模块介绍

websocket 模块在整个 STF 框架负责直接与用户的前端进行数据交互,我们知道,socket 的特点是可以双向通信,因此,当一个用户占用手机后,其他用户的前端显示手机忙碌这个信息就是通过 websocket 从后端传到前端的,而用户占用手机的信息,则是从前端传到后端的。此外,在屏幕上的点击和滑动操作,也是通过 websocket 传到后端的。既然是 websocket,就可以用浏览器调试看到内容,调试的方法是在浏览器的 network 调试窗口选择 WS 即可。

如果你遇到这个错误,基本上就是 websocket 模块的问题:

下面举几个例子:

["device.change",{"important":false,"data":{"serial":"123456789","battery":{"status":"full","health":"good","source":"usb","level":100,"scale":100,"temp":35,"voltage":4.224}}}]

这是设备状态改变的一条消息,表示手机的电池状态有些改变。

["group.invite","ZTBTrJCtqjoiV1oHBNFELkryCXA=","tx.5c5241d3-7b7d-4493-9734-9498d859fc6a",{"requirements":{"serial":{"value":"123456789","match":"exact"}},"usage":null}]

这是占用手机的一条消息。

["input.touchMove","rjGnAV/sfyzGV/N+aaJhUNrePEU=",{"seq":35,"contact":0,"x":0.07175925925925926,"y":0.365234375,"pressure":0.5}]

这是用户在手机屏幕上操作的一条消息,表示用户划动的操作,还标示了用户划动的坐标信息。

二、websocket 模块框架

1、websocket 的结构

从图中可以看出,websocket 包含 push 和 sub 两个端口,当然还包括与用户通信的 socket 的端口没有标示出来,其中 push 用来把用户的消息(例如占用手机)发送到后台,sub 用来接收后端的消息(例如手机被占用)。

2、websocket 的启动方式

从 STF 的官方文档可以看出,启动 websocket 的启动需要指定 port、--storage-url、--connect-sub、--connect-push

stf websocket --port 3000 \
  --storage-url https://stf.example.org/ \
  --connect-sub tcp://appside.stf.example.org:7150 \
  --connect-push tcp://appside.stf.example.org:7170

这里 storage-url 是指存储的路径 url,主要指截图还有上传 apk 时的存储 url,connect-sub 就是监听消息的 triproxy 地址,connect-push 发送消息的 triproxy 地址,关于 triproxy 后续再专门介绍。

3、websocket 与数据库的交互

在 docker 的启动方式中 link 了数据库

--link rethinkdb-proxy-28015:rethinkdb 

可以看出,websocket 模块指定的数据库,这表示 websocket 模块是与数据库有交互的。从源代码也可以看出,websocket 与数据库交互的内容主要包括设备的标记、用户的设置等内容,这也可以理解,这些内容通过 websocket 传过来以后就可以直接处理了,不需要再交线 processor 或者其他模块处理。

顺便提一下,从官方 docker 方式启动的脚本中有没有 link 数据库就可以看出这个模块有没有用到数据库,比如 stf-app 模块用到了数据库,而 provider 则没有。

二、websocket 模块源码简单分析

之所以说简单分析是因为源码真的很多,不可能一句句都解释。

var push = zmqutil.socket('push')
......
var sub = zmqutil.socket('sub')

这两句话分别对应 connect-push 和 connect-sub,分别用来发送、接收消息和命令,详细可以看下 zmq 的相关知识和 zmqutil 的代码,再往下:

;[wireutil.global].forEach(function(channel) {
  log.info('Subscribing to permanent channel "%s"', channel)
  sub.subscribe(channel)
})

这句的意思是监听所有的全局命令字,在 zmq 中 sub 消息的时候可以指定特定命令字,而有一些命令字是全局的,需要一直监听,例如 provider 这个模块就不需要监听所有手机的命令,只需要监听自己连着的那些手机即可。

io.use(cookieSession({
  name: options.ssid
, keys: [options.secret]
}))

io.use(ip({
  trust: function() {
    return true
  }
}))

io.use(auth)

io.on('connection', function(socket) {
  var req = socket.request
  var user = req.user
  var channels = []

  user.ip = socket.handshake.query.uip || req.ip
  socket.emit('socket.ip', user.ip)

  function joinChannel(channel) {
    channels.push(channel)
    channelRouter.on(channel, messageListener)
    sub.subscribe(channel)
  }

  function leaveChannel(channel) {
    _.pull(channels, channel)
    channelRouter.removeListener(channel, messageListener)
    sub.unsubscribe(channel)
  }

  function createKeyHandler(Klass) {
    return function(channel, data) {
      push.send([
        channel
      , wireutil.envelope(new Klass(
          data.key
        ))
      ])
    }
  }

这一大段是处理 websocket 的,简单来说就是处理用户浏览器与后端交互的。前面几个 io.use 和 http 服务器比较像,就是加上 cookies 之类的东西,而且实现都在 middleware 文件夹中。

先看 cookis-session.js 文件,其实很简单,就是加了个 session,这个就不再多解释,详情可以到 cookieSession 的官网看说明:https://www.npmjs.com/package/cookie-session

var session = cookieSession(options)
return function(socket, next) {
  var req = socket.request
  var res = Object.create(null)
  session(req, res, next)
}

remote-ip.js 也比较简单,只是通过 request 拿到了 websocket 的 ip。proxyaddr 也可以在 npm 中找到,https://www.npmjs.com/package/proxy-addr

return function(socket, next) {
  var req = socket.request
  req.ip = proxyaddr(req, options.trust)
  next()
}

auth.js 的关键代码如下,是通过 session 中的 jwt 信息拿到 email,然后通过查数据库保证用户合法,关于 jwt 会再开一篇文件详细介绍一下。

return dbapi.loadUser(token.email)
  .then(function(user) {
    if (user) {
      req.user = user
      next()
    }

前面代码中的 joinChannel 和 leaveChannel 我的理解是监听和取消监听一个 zmq 通道,就是特定的命令字,但是更具体的用途还没搞清楚,createKeyHandler 这个方法也没搞清楚有什么用。

下面看关键一段:

var messageListener = wirerouter()
  .on(wire.DeviceLogMessage, function(channel, message) {
    socket.emit('device.log', message)
  })
  .on(wire.DeviceIntroductionMessage, function(channel, message) {
  ....

这是一个 zmq 消息监听的处理代码,这里的 on 是指收到特定消息的回调方法,其实很简单的,只要看几个典型的消息就知道了。比如说这一段:

.on(wire.JoinGroupMessage, function(channel, message) {
  socket.emit('device.change', {
    important: true
  , data: datautil.applyOwner({
        serial: message.serial
      , owner: message.owner
      , likelyLeaveReason: 'owner_change'
      , usage: message.usage
      }
    , user
    )
  })
})

当 websocket 收到 JoinGroupMessage 消息时,会通过 socket 发送"device.change"命令给 web 前端,告诉所以在线的 web 前端这个设备已经被占用了,然后前端就会显示手机繁忙状态,在 data 中标示手机的串号、占用的用户以及用途等信息。这里 on 到消息都可以在 lib\wire\wire.proto 找到的。

下面看 socket on 这一段:

new Promise(function(resolve) {
  socket.on('disconnect', resolve)
    // Global messages for all clients using socket.io
    //
    // Device note
    .on('device.note', function(data) {
      return dbapi.setDeviceNote(data.serial, data.note)
      .....

socket on 是处理用户前端发过的消息的,可以看到 websocket 收到用户消息后大部分通过 push.send 发送到了后台,就是 zmq 消息总线,然后让 provider 去处理。也有很多地方可以直接操作数据库,而不是交给 provider,比如说修改用户设置这类的消息。

在 socket on 中还有一些.on('input.touchMove...) 的内容,可以明显看出这是处理用户操作屏幕动作的。

中间还有一些处理异常的代码就不再细说了。

lifecycle.observe(function() {
  [push, sub].forEach(function(sock) {
    try {
      sock.close()
    }
    catch (err) {
      // No-op
    }
  })
})

这一段好像是处理错误情况的,具体还没搞懂,应该和 lifecycle 有很大关系,有谁明白的话麻烦给我说一下 _^

补充:根据@chenhengjie123的提示,这段内容的用途是在用户使用 CTRL+C 关闭 STF 模块时捕获这个关闭命令,从而做一些关闭 socket 连接等后续扫尾工作,实现友好关闭。每个模块都有这段代码。

总体来说呢,websocket 模块并不复杂,其实就是转发 provider 消息给前端,或者处理用户发过来的消息,只要知道了这两点儿,就基本理解了 websocket 模块。


↙↙↙阅读原文可查看相关链接,并与作者交流