STF openstf 模块解读--websocket

blueshark · 2017年05月04日 · 最后由 测试生 回复于 2018年12月10日 · 4368 次阅读
本帖已被设为精华帖!

一、websocket 模块介绍

websocket 模块在整个 STF 框架负责直接与用户的前端进行数据交互,我们知道,socket 的特点是可以双向通信,因此,当一个用户占用手机后,其他用户的前端显示手机忙碌这个信息就是通过 websocket 从后端传到前端的,而用户占用手机的信息,则是从前端传到后端的。此外,在屏幕上的点击和滑动操作,也是通过 websocket 传到后端的。既然是 websocket,就可以用浏览器调试看到内容,调试的方法是在浏览器的 network 调试窗口选择 WS 即可。

如果你遇到这个错误,基本上就是 websocket 模块的问题:

下面举几个例子:

["device.change",{"important":false,"data":{"serial":"123456789","battery":{"status":"full","health":"good","source":"usb","level":100,"scale":100,"temp":35,"voltage":4.224}}}]

这是设备状态改变的一条消息,表示手机的电池状态有些改变。

["group.invite","ZTBTrJCtqjoiV1oHBNFELkryCXA=","tx.5c5241d3-7b7d-4493-9734-9498d859fc6a",{"requirements":{"serial":{"value":"123456789","match":"exact"}},"usage":null}]

这是占用手机的一条消息。

["input.touchMove","rjGnAV/sfyzGV/N+aaJhUNrePEU=",{"seq":35,"contact":0,"x":0.07175925925925926,"y":0.365234375,"pressure":0.5}]

这是用户在手机屏幕上操作的一条消息,表示用户划动的操作,还标示了用户划动的坐标信息。

二、websocket 模块框架

1、websocket 的结构

从图中可以看出,websocket 包含 push 和 sub 两个端口,当然还包括与用户通信的 socket 的端口没有标示出来,其中 push 用来把用户的消息(例如占用手机)发送到后台,sub 用来接收后端的消息(例如手机被占用)。

2、websocket 的启动方式

从 STF 的官方文档可以看出,启动 websocket 的启动需要指定 port、--storage-url、--connect-sub、--connect-push

stf websocket --port 3000 \
  --storage-url https://stf.example.org/ \
  --connect-sub tcp://appside.stf.example.org:7150 \
  --connect-push tcp://appside.stf.example.org:7170

这里 storage-url 是指存储的路径 url,主要指截图还有上传 apk 时的存储 url,connect-sub 就是监听消息的 triproxy 地址,connect-push 发送消息的 triproxy 地址,关于 triproxy 后续再专门介绍。

3、websocket 与数据库的交互

在 docker 的启动方式中 link 了数据库

--link rethinkdb-proxy-28015:rethinkdb 

可以看出,websocket 模块指定的数据库,这表示 websocket 模块是与数据库有交互的。从源代码也可以看出,websocket 与数据库交互的内容主要包括设备的标记、用户的设置等内容,这也可以理解,这些内容通过 websocket 传过来以后就可以直接处理了,不需要再交线 processor 或者其他模块处理。

顺便提一下,从官方 docker 方式启动的脚本中有没有 link 数据库就可以看出这个模块有没有用到数据库,比如 stf-app 模块用到了数据库,而 provider 则没有。

二、websocket 模块源码简单分析

之所以说简单分析是因为源码真的很多,不可能一句句都解释。

var push = zmqutil.socket('push')
......
var sub = zmqutil.socket('sub')

这两句话分别对应 connect-push 和 connect-sub,分别用来发送、接收消息和命令,详细可以看下 zmq 的相关知识和 zmqutil 的代码,再往下:

;[wireutil.global].forEach(function(channel) {
  log.info('Subscribing to permanent channel "%s"', channel)
  sub.subscribe(channel)
})

这句的意思是监听所有的全局命令字,在 zmq 中 sub 消息的时候可以指定特定命令字,而有一些命令字是全局的,需要一直监听,例如 provider 这个模块就不需要监听所有手机的命令,只需要监听自己连着的那些手机即可。

io.use(cookieSession({
  name: options.ssid
, keys: [options.secret]
}))

io.use(ip({
  trust: function() {
    return true
  }
}))

io.use(auth)

io.on('connection', function(socket) {
  var req = socket.request
  var user = req.user
  var channels = []

  user.ip = socket.handshake.query.uip || req.ip
  socket.emit('socket.ip', user.ip)

  function joinChannel(channel) {
    channels.push(channel)
    channelRouter.on(channel, messageListener)
    sub.subscribe(channel)
  }

  function leaveChannel(channel) {
    _.pull(channels, channel)
    channelRouter.removeListener(channel, messageListener)
    sub.unsubscribe(channel)
  }

  function createKeyHandler(Klass) {
    return function(channel, data) {
      push.send([
        channel
      , wireutil.envelope(new Klass(
          data.key
        ))
      ])
    }
  }

这一大段是处理 websocket 的,简单来说就是处理用户浏览器与后端交互的。前面几个 io.use 和 http 服务器比较像,就是加上 cookies 之类的东西,而且实现都在 middleware 文件夹中。

先看 cookis-session.js 文件,其实很简单,就是加了个 session,这个就不再多解释,详情可以到 cookieSession 的官网看说明:https://www.npmjs.com/package/cookie-session

var session = cookieSession(options)
return function(socket, next) {
  var req = socket.request
  var res = Object.create(null)
  session(req, res, next)
}

remote-ip.js 也比较简单,只是通过 request 拿到了 websocket 的 ip。proxyaddr 也可以在 npm 中找到,https://www.npmjs.com/package/proxy-addr

return function(socket, next) {
  var req = socket.request
  req.ip = proxyaddr(req, options.trust)
  next()
}

auth.js 的关键代码如下,是通过 session 中的 jwt 信息拿到 email,然后通过查数据库保证用户合法,关于 jwt 会再开一篇文件详细介绍一下。

return dbapi.loadUser(token.email)
  .then(function(user) {
    if (user) {
      req.user = user
      next()
    }

前面代码中的 joinChannel 和 leaveChannel 我的理解是监听和取消监听一个 zmq 通道,就是特定的命令字,但是更具体的用途还没搞清楚,createKeyHandler 这个方法也没搞清楚有什么用。

下面看关键一段:

var messageListener = wirerouter()
  .on(wire.DeviceLogMessage, function(channel, message) {
    socket.emit('device.log', message)
  })
  .on(wire.DeviceIntroductionMessage, function(channel, message) {
  ....

这是一个 zmq 消息监听的处理代码,这里的 on 是指收到特定消息的回调方法,其实很简单的,只要看几个典型的消息就知道了。比如说这一段:

.on(wire.JoinGroupMessage, function(channel, message) {
  socket.emit('device.change', {
    important: true
  , data: datautil.applyOwner({
        serial: message.serial
      , owner: message.owner
      , likelyLeaveReason: 'owner_change'
      , usage: message.usage
      }
    , user
    )
  })
})

当 websocket 收到 JoinGroupMessage 消息时,会通过 socket 发送"device.change"命令给 web 前端,告诉所以在线的 web 前端这个设备已经被占用了,然后前端就会显示手机繁忙状态,在 data 中标示手机的串号、占用的用户以及用途等信息。这里 on 到消息都可以在 lib\wire\wire.proto 找到的。

  • 需要提醒一下是,websocket 收到的 zmq 消息全部都是 provider 发过来的,发过来的时候已经被 processor 模块处理过了,而 websocket 模块发出来的消息是直接转发给 provider,几乎没有处理,关于这些问题会在 processor 模块介绍中详细讲一下。

下面看 socket on 这一段:

new Promise(function(resolve) {
  socket.on('disconnect', resolve)
    // Global messages for all clients using socket.io
    //
    // Device note
    .on('device.note', function(data) {
      return dbapi.setDeviceNote(data.serial, data.note)
      .....

socket on 是处理用户前端发过的消息的,可以看到 websocket 收到用户消息后大部分通过 push.send 发送到了后台,就是 zmq 消息总线,然后让 provider 去处理。也有很多地方可以直接操作数据库,而不是交给 provider,比如说修改用户设置这类的消息。

在 socket on 中还有一些.on('input.touchMove...) 的内容,可以明显看出这是处理用户操作屏幕动作的。

中间还有一些处理异常的代码就不再细说了。

lifecycle.observe(function() {
  [push, sub].forEach(function(sock) {
    try {
      sock.close()
    }
    catch (err) {
      // No-op
    }
  })
})

这一段好像是处理错误情况的,具体还没搞懂,应该和 lifecycle 有很大关系,有谁明白的话麻烦给我说一下 _^

补充:根据@chenhengjie123的提示,这段内容的用途是在用户使用 CTRL+C 关闭 STF 模块时捕获这个关闭命令,从而做一些关闭 socket 连接等后续扫尾工作,实现友好关闭。每个模块都有这段代码。

总体来说呢,websocket 模块并不复杂,其实就是转发 provider 消息给前端,或者处理用户发过来的消息,只要知道了这两点儿,就基本理解了 websocket 模块。

共收到 15 条回复 时间 点赞

赞赞的,最适合我这种懒人了。

手机截图也是通过 websocket 实时传输的,搞定这个,录制工具的效率就提升许多了。

支持楼主输出这种详细的源码解析,方便其他人学习。不过阅读者需要能理解 nodejs 的事件绑定机制和匿名函数的用法才能大概看明白一点点。

simple 回复

@carl @0x88 多谢各位关注,如果有写的不对的地方还请指正

思寒_seveniruby 将本帖设为了精华贴 05月04日 10:50

lifecycle 那段,应该是用于通过 ctrl+c 关闭 stf 服务时,能给当前正在运行的每个模块发送关闭操作来实现友好关闭(比如释放资源)的。基本上每个 stf 的模块都有类似的方法。

lifecycle 相关的代码:

...
function Lifecycle() {
  this.observers = []
  this.ending = false
  process.on('SIGINT', this.graceful.bind(this))
  process.on('SIGTERM', this.graceful.bind(this))
}
...
Lifecycle.prototype.graceful = function() {
  log.info('Winding down for graceful exit')

  this.ending = true

  var wait = Promise.all(this.observers.map(function(fn) {
    return fn()
  }))

  return wait.then(function() {
    process.exit(0)
  })
}
陈恒捷 回复

是的,确实是这样,每个模块都有,多谢!

支持,现在还没做到这步

楼主能不能针对 websocket,搞个小例子,帮助小白我理解一下这个 websocket

支持楼主输出这种详细的源码解析,方便其他人学习。

发了微信公众号后,有同学反馈原文第二章有些地方 websocket 错写成了 webdocket 。我现在更正过来了,如果其它地方也有发过这个文章,也一并更正吧。

陈恒捷 回复

谢谢指正!不好意思,写的比较匆忙!下次注意

blueshark 回复

没事,这个问题我发公众号检查的时候也没发现。后面我们都留意好这些细节吧。

补充一下,websocket 是浏览器中新的传输协议,类似于 socket,双向通信。常见的浏览器 - 服务器模式,它的通信都是单向的:浏览器发请求给服务端。但是 STF 这个项目需要双向通信。比如说:服务端接收到手机掉线的信号后,能立即返回给浏览器。这个时候使用 websocket 协议就非常合适了,而且 websocket 传输数据比 HTTP 快。这也是为什么 STF 在电脑上操作手机和实际操作手机没有什么差别的,原因之一。

shuta 回复

『而且 websocket 传输数据比 HTTP 快。这也是为什么 STF 在电脑上操作手机和实际操作手机没有什么差别的,原因之一』这句话可能会让人误解,并不是 websocket 快,http 和 websocket 同样基于 socket 通信,实质速度是一样的,只不过 websocket 少了很多握手过程,所以显得迅速了一些😂


我似乎未找到楼主的这篇分享呢~

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册