STF STF 二次开发指南 (1) 设备连接

隐身 · 2018年05月17日 · 最后由 我喜欢的K17 回复于 2020年01月31日 · 5752 次阅读

前言

开始接触 STF 是在 16 年底,当时对于 JavaScript 只是停留在熟练其语言名称拼写的状态😄 ,更别提 nodejs 了。于是过年那段时间基本上都在家啃源码。开始的时候挺很痛苦,面对不熟悉的语言,大量的代码,一筹莫展。但是也要硬着头皮上啊,谁叫过完年还有一大波需求要实现呢😭 。于是就想到应该先抓住一个入口点,先走一遍大概的流程,了解了整体的架构之后再看每个模块逐个攻破。所以就先从 STF 的设备连管理的源码开始进行分析,其中会涉及到一些 STF 其他的功能模块在此先不详细展开,会在后续的文章中给大家分享,好了接下来我们就开始吧。

设备管理节点 (provider)

先来说说 STF 是如何发现设备连上 usb 的,主要是通过 provider 进程,provider 进程源码目录是/stf/lib/units/provider。
我们打开源码文件,找到 provider:line:111 这一行。

client.trackDevices().then(function(tracker) {
  log.info('Tracking devices')
  ...
}

这里的 client 是 STF 的作者使用 JavaScript 封装的 adb client 对象 (adbkit),可在 provider:line:17 找到其初始化的地方。

var adb = require('adbkit')
...
var client = adb.createClient({
    host: options.adbHost
  , port: options.adbPort
  })

options.adbHost、options.adbPort 指的是 adb server 的访问地址,默认是本机 5037 端口。
此处的 options 对象其实是 provider 进程的启动参数对象,下面会有讲解。

在 client.trackDevices() 的 then 回调方法中可以获取到 tracker 对象,tracker 可以设置 3 个监听消息,分别是:add、change、remove,设备连接响应的是 add 消息。
为 tracker 对象设置'add'消息监听,provider:line:153。

tracker.on('add', filterDevice(function(device) {
  log.info('Found device "%s" (%s)', device.id, device.type)
}

当有设备连上 usb 时会触发此监听消息。此处我们注意到在 add 消息的回调方法外包了一个 filterDevice()。

function filterDevice(listener) {
  return function(device) {
    if (isWeirdUnusableDevice(device)) {
      log.warn('ADB lists a weird device: "%s"', device.id)
      return false
    }
    if (!options.allowRemote && isRemoteDevice(device)) {
      log.info(
        'Filtered out remote device "%s", use --allow-remote to override'
      , device.id
      )
      return false
    }
    if (options.filter && !options.filter(device)) {
      log.info('Filtered out device "%s"', device.id)
      return false
    }
    return listener(device)
  }
}

通过查看 filterDevice() 方法的源码我们可以了解到该方法主要是过滤一些不希望连接的设备的,主要是:deviceId 异常的设备、不允许远程设备连接时禁止远程设备连接、以及设备黑名单中定义的 deviceId。

好了,我们再次回到 tracker.add 消息的回调方法,provider:line:162。

var register = new Promise(function(resolve) {
  // Tell others we found a device
  push.send([
    wireutil.global
  , wireutil.envelope(new wire.DeviceIntroductionMessage(
      device.id
    , wireutil.toDeviceStatus(device.type)
    , new wire.ProviderMessage(
        solo
      , options.name
      )
    ))
  ])
  privateTracker.once('register', resolve)
})

此时我们在 add 消息回调方法内,当 provider 发现有设备连接时,第一步是先通过 zmq 向 processor 发送一个 protoBuf 消息 DeviceIntroductionMessage,响应此消息的 STF 模块有:processor、websocket、reaper。大家可以先将这 3 个模块理解为 3 个 node 进程。

processor 进程最先收到 DeviceIntroductionMessage,processor 源码位于/stf/lib/units/processor 目录,processor:line:59。

.on(wire.DeviceIntroductionMessage, function(channel, message, data) {
  dbapi.saveDeviceInitialState(message.serial, message)
    .then(function() {
      devDealer.send([
        message.provider.channel
      , wireutil.envelope(new wire.DeviceRegisteredMessage(
          message.serial
        ))
      ])
      appDealer.send([channel, data])
    })
})

此处是 DeviceIntroductionMessage 的响应,我们看 dbapi.saveDeviceInitialState(),该方法将消息中携带的设备信息保存到 rethinkDB,保存成功后向 provider 发送 DeviceRegisteredMessage,随后继续将 DeviceIntroductionMessage 广播出去(websocket、reaper)。

websocket 进程源码位于/stf/lib/units/websocket 目录,websocket:line:125。

.on(wire.DeviceIntroductionMessage, function(channel, message) {
  socket.emit('device.add', {
    important: true
  , data: {
      serial: message.serial
    , present: false
    , provider: message.provider
    , owner: null
    , status: message.status
    , ready: false
    , reverseForwards: []
    }
  })
})

websocket 进程收到 DeviceIntroductionMessage 后会通过 websocket(协议) 向前端发送一个'device.add'消息,如果没有浏览器打开 STF 页面的话可以忽略此处,前端页面会根据该消息更新设备连接状态。

然后我们看 reaper 进程(源码位置你懂的),reaper:line:94。

.on(wire.DeviceIntroductionMessage, function(channel, message) {
  ttlset.drop(message.serial, TtlSet.SILENT)
  ttlset.bump(message.serial, Date.now())
})

reaper 进程维护着一个 TTLSet,主要负责对超过存活时间的设备进行收割,当收到 DeviceIntroductionMessage 时 reaper 会将该设备同当前时间初始化到 TTSet 中去。至此对于 DeviceIntroductionMessage 相关的的所有处理已经完成。

我们之前说到 processor 进程在保存完设备初始化信息后会向 provider 发送 DeviceRegisteredMessage 消息,所以我们再次回到 provider,provider:line:428。

sub.on('message', wirerouter()
  .on(wire.DeviceRegisteredMessage, function(channel, message) {
    flippedTracker.emit(message.serial, 'register')
  })
  .handler())

provider 收到 DeviceRegisteredMessage 会通过 flippedTracker 发送一个以 deviceId 为消息名称的消息,参数为'register'(tips:此处涉及 EventEmitter 和 Promise 的基本概念,大家可以参考相关资料,不在此处展开),具体处理代码是 provider:line:339。

register.then(function() {
  log.info('Registered device "%s"', device.id)
  check()
})

调用 check() 开始依次调用 work()、spawn()。到此第一步的设备注册已经完成,接下来就是启动 device 进程了。

先说一下 check()

function check() {
  clearTimeout(timer)
  if (device.present) {
    // We might get multiple status updates in rapid succession,
    // so let's wait for a while
    switch (device.type) {
      case 'device':
      case 'emulator':
        willStop = false
        timer = setTimeout(work, 100)
        break
      default:
        willStop = true
        timer = setTimeout(stop, 100)
        break
    }
  }
  else {
    willStop = true
    stop()
  }
}

如果 device.type 正常的话就会调用 work(),需要注意的是如果此时设备状态有变化或者断开的话 check() 是会被多次调用(setTimeout & clearTimeout)。

然后是 work()

function work() {
  return (worker = workers[device.id] = spawn())
    .then(function() {
      log.info('Device worker "%s" has retired', device.id)
      ...
    })
    .catch(procutil.ExitError, function(err) {
      ...
    })
}

该方法主要是调用 spawn(),并保存一个对应 deviceId 的 Promise 对象到 workers 进行管理,这里涉及到 lifecycle 工具类,该类主要的作用是在当前 node 进程结束的时候做些必要的收尾工作。

再然后是 spawn()

function spawn() {
  var allocatedPorts = ports.splice(0, 4)
  var proc = options.fork(device, allocatedPorts.slice())
  var resolver = Promise.defer()
  var didExit = false
  ...
}

开始说 spawn() 之前不得不先说一下 STF 各个模块启动的方式,我们来先大致看一下启动 provider 进程的源码 (/stf/lib/cli/provider)。
重点看这 2 个地方

return yargs
    .strict()
    .env('STF_PROVIDER')
    .option('adb-host', {
      describe: 'The ADB server host.'
    , type: 'string'
    , default: '127.0.0.1'
    })
    .option('adb-port', {
      describe: 'The ADB server port.'
    , type: 'number'
    , default: 5037
    })
    ...

STF 使用 yargs 包装每个模块的启动命令,此处是 provider 模块的启动参数定义,简单的看几个常用参数:
adb-host & adb-port:就是上面说到的 adb server 的访问地址。
allow-remote:是否允许通过远程连接 adb 方式的设备连接 STF。
max-port & min-port:定义一个 port 区间,默认是 7700 ~ 7400,每个 device 进程会分配到 4 个连续的 port。
其他的一些参数也是非常有用的,大家可以参考代码中的 describe 了解其用法。

fork 方法

fork: function(device, ports) {
  var fork = require('child_process').fork
  var args = [
    'device'
  , '--serial', device.id
  , '--provider', argv.name
  , '--screen-port', ports.shift()
  , '--connect-port', ports.shift()
  , '--vnc-port', ports.shift()
  , '--public-ip', argv.publicIp
  , '--group-timeout', argv.groupTimeout
  , '--storage-url', argv.storageUrl
  , '--adb-host', argv.adbHost
  , '--adb-port', argv.adbPort
  , '--screen-jpeg-quality', argv.screenJpegQuality
  , '--screen-ping-interval', argv.screenPingInterval
  , '--screen-ws-url-pattern', argv.screenWsUrlPattern
  , '--connect-url-pattern', argv.connectUrlPattern
  , '--heartbeat-interval', argv.heartbeatInterval
  , '--boot-complete-timeout', argv.bootCompleteTimeout
  , '--vnc-initial-size', argv.vncInitialSize.join('x')
  , '--mute-master', argv.muteMaster
  ]
  ...
  return fork(cli, args)
}

此方法就是 device 进程产生的核心方法了,在 provider 进程的 spawn() 中调用,通过 child_process 启动 device 子进程。

然后我们可以回到 provider 的 spawn() 了,现在再来看就容易理解了,spawn 主要就是调用 provider 启动参数中传入的 fork 方法了,fork 方法返回一个子进程对象,接下来就是设置与 device 子进程之间的消息监听了。

proc.on('exit', exitListener)
proc.on('error', errorListener)
proc.on('message', messageListener)

具体 Listener 的代码就不再贴出了。

device 进程入口源码 (/stf/units/device/index.js)

if (process.send) {
  // Only if we have a parent process
  process.send('ready')
}
log.info('Fully operational')

device 进程启动成功之后会向父进程发送一个 message,内容是'ready',表示 device 进程启动成功了,至此整个设备连接就已经完成了。

设备进程 (device)

最后来我们简单的看一下 STF 是如何与设备进行通信的。先介绍一下 device 进程,其源码位于/stf/lib/units/device,device 进程启动时会使用 syrup 顺序加载 plugin,syrup 是一个 STF 自带的依赖注入框架,plugins 就是设备上相应的各个功能了,比如:InstallAPK、Logcat、Shell 等等。总的来说 device 进程调用设备上的具体功能就是通过 minicap、minitouch、adb、STFService(APK) 来实现的。

结尾

好了,以上就是整个设备连接的大致过程,有些细节没有在文中提及,但是应该也不影响对此的理解,推荐大家也可以阅读一下这部分的源码,发现更多的精彩。
PS:为了方便大家跟随本篇文章的步骤学习源码,故将 STF 版本锁定在 3.2.0,大家可以自行上 github clone 项目。

共收到 13 条回复 时间 点赞

不错不错,写得逻辑清晰

隐身 STF 二次开发指南 (2) Device 启动解析 中提及了此贴 05月21日 18:14
16楼 已删除
15楼 已删除

@az289565747 大神求继续分享 STF 二次开发指南,现在看代码看的头晕,有 stf 的总体架构图之类的帮助下理解吗?

隐身 #13 · 2018年07月23日 Author
water 回复

我整理了一个 xmind 可以发给你

匿名 #12 · 2018年07月27日

stf 我可以调接口方式向 stf 注册使用设备吗?然后我使用完了就通过调 stf 接口去释放设备?

隐身 #11 · 2018年07月30日 Author

你是想调一个接口把某台设备标记为占用状态是吧?然后用完了再标回来?STF 本身就是这样的。

隐身 #10 · 2018年07月30日 Author

建了个 QQ 群 (850643090) 大家有问题可以加群问,回复比较快。

楼主 我想给设备固定住端口号,这个看哪部分那。我目前的做法是把现有的设备 id 和 port 保存成 json 文件,当在初始化设备的时候我加了查询这个设备的端口号模块,自己写的模块里有个回调,在回调里赋值给 stf 分配的连接端口 ([screen port,connect port, vnc port, xxxx]),但是没有起作用,用的还是 stf 分配的,是我的写法有问题吗。

zhanglimin 回复

provider 的 spawn() 方法里面 在 fork device 的时候会传端口号。建议先缓存好 port 对应关系,在原 ports.splice(0, 4) 的地方替换成你的逻辑,拿到固定的端口号,然后传给 fork

隐身 回复

申请加 qq 群咯,我也正在改 STF,求大神指导- 3-

隐身 回复

是应该缓存起来,每次查库这一步太浪费时间了 ,谢谢大神指导

mrx102 STF 集成 iOS 之设备连接 中提及了此贴 04月06日 09:05
隐身 回复

我也想要😵

琉丶言 STF 集成 iOS 之源码分析 中提及了此贴 06月25日 18:58
匿名 #2 · 2019年10月05日

你好,请问你的 xmind 可以发我一份吗?

加群了,楼主同意一下呀

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册