开始接触 STF 是在 16 年底,当时对于 JavaScript 只是停留在熟练其语言名称拼写的状态 ,更别提 nodejs 了。于是过年那段时间基本上都在家啃源码。开始的时候挺很痛苦,面对不熟悉的语言,大量的代码,一筹莫展。但是也要硬着头皮上啊,谁叫过完年还有一大波需求要实现呢 。于是就想到应该先抓住一个入口点,先走一遍大概的流程,了解了整体的架构之后再看每个模块逐个攻破。所以就先从 STF 的设备连管理的源码开始进行分析,其中会涉及到一些 STF 其他的功能模块在此先不详细展开,会在后续的文章中给大家分享,好了接下来我们就开始吧。
先来说说 STF 是如何发现设备连上 usb 的,主要是通过 provider 进程,provider 进程源码目录是/stf/lib/units/provider。
我们打开源码文件,找到 provider:line:111 这一行。
client.trackDevices().then(function(tracker) {
log.info('Tracking devices')
...
}
这里的 client 是 STF 的作者使用 JavaScript 封装的 adb client 对象 (adbkit),可在 provider:line:17 找到其初始化的地方。
var adb = require('adbkit')
...
var client = adb.createClient({
host: options.adbHost
, port: options.adbPort
})
options.adbHost、options.adbPort 指的是 adb server 的访问地址,默认是本机 5037 端口。
此处的 options 对象其实是 provider 进程的启动参数对象,下面会有讲解。
在 client.trackDevices() 的 then 回调方法中可以获取到 tracker 对象,tracker 可以设置 3 个监听消息,分别是:add、change、remove,设备连接响应的是 add 消息。
为 tracker 对象设置'add'消息监听,provider:line:153。
tracker.on('add', filterDevice(function(device) {
log.info('Found device "%s" (%s)', device.id, device.type)
}
当有设备连上 usb 时会触发此监听消息。此处我们注意到在 add 消息的回调方法外包了一个 filterDevice()。
function filterDevice(listener) {
return function(device) {
if (isWeirdUnusableDevice(device)) {
log.warn('ADB lists a weird device: "%s"', device.id)
return false
}
if (!options.allowRemote && isRemoteDevice(device)) {
log.info(
'Filtered out remote device "%s", use --allow-remote to override'
, device.id
)
return false
}
if (options.filter && !options.filter(device)) {
log.info('Filtered out device "%s"', device.id)
return false
}
return listener(device)
}
}
通过查看 filterDevice() 方法的源码我们可以了解到该方法主要是过滤一些不希望连接的设备的,主要是:deviceId 异常的设备、不允许远程设备连接时禁止远程设备连接、以及设备黑名单中定义的 deviceId。
好了,我们再次回到 tracker.add 消息的回调方法,provider:line:162。
var register = new Promise(function(resolve) {
// Tell others we found a device
push.send([
wireutil.global
, wireutil.envelope(new wire.DeviceIntroductionMessage(
device.id
, wireutil.toDeviceStatus(device.type)
, new wire.ProviderMessage(
solo
, options.name
)
))
])
privateTracker.once('register', resolve)
})
此时我们在 add 消息回调方法内,当 provider 发现有设备连接时,第一步是先通过 zmq 向 processor 发送一个 protoBuf 消息 DeviceIntroductionMessage,响应此消息的 STF 模块有:processor、websocket、reaper。大家可以先将这 3 个模块理解为 3 个 node 进程。
processor 进程最先收到 DeviceIntroductionMessage,processor 源码位于/stf/lib/units/processor 目录,processor:line:59。
.on(wire.DeviceIntroductionMessage, function(channel, message, data) {
dbapi.saveDeviceInitialState(message.serial, message)
.then(function() {
devDealer.send([
message.provider.channel
, wireutil.envelope(new wire.DeviceRegisteredMessage(
message.serial
))
])
appDealer.send([channel, data])
})
})
此处是 DeviceIntroductionMessage 的响应,我们看 dbapi.saveDeviceInitialState(),该方法将消息中携带的设备信息保存到 rethinkDB,保存成功后向 provider 发送 DeviceRegisteredMessage,随后继续将 DeviceIntroductionMessage 广播出去(websocket、reaper)。
websocket 进程源码位于/stf/lib/units/websocket 目录,websocket:line:125。
.on(wire.DeviceIntroductionMessage, function(channel, message) {
socket.emit('device.add', {
important: true
, data: {
serial: message.serial
, present: false
, provider: message.provider
, owner: null
, status: message.status
, ready: false
, reverseForwards: []
}
})
})
websocket 进程收到 DeviceIntroductionMessage 后会通过 websocket(协议) 向前端发送一个'device.add'消息,如果没有浏览器打开 STF 页面的话可以忽略此处,前端页面会根据该消息更新设备连接状态。
然后我们看 reaper 进程(源码位置你懂的),reaper:line:94。
.on(wire.DeviceIntroductionMessage, function(channel, message) {
ttlset.drop(message.serial, TtlSet.SILENT)
ttlset.bump(message.serial, Date.now())
})
reaper 进程维护着一个 TTLSet,主要负责对超过存活时间的设备进行收割,当收到 DeviceIntroductionMessage 时 reaper 会将该设备同当前时间初始化到 TTSet 中去。至此对于 DeviceIntroductionMessage 相关的的所有处理已经完成。
我们之前说到 processor 进程在保存完设备初始化信息后会向 provider 发送 DeviceRegisteredMessage 消息,所以我们再次回到 provider,provider:line:428。
sub.on('message', wirerouter()
.on(wire.DeviceRegisteredMessage, function(channel, message) {
flippedTracker.emit(message.serial, 'register')
})
.handler())
provider 收到 DeviceRegisteredMessage 会通过 flippedTracker 发送一个以 deviceId 为消息名称的消息,参数为'register'(tips:此处涉及 EventEmitter 和 Promise 的基本概念,大家可以参考相关资料,不在此处展开),具体处理代码是 provider:line:339。
register.then(function() {
log.info('Registered device "%s"', device.id)
check()
})
调用 check() 开始依次调用 work()、spawn()。到此第一步的设备注册已经完成,接下来就是启动 device 进程了。
先说一下 check()
function check() {
clearTimeout(timer)
if (device.present) {
// We might get multiple status updates in rapid succession,
// so let's wait for a while
switch (device.type) {
case 'device':
case 'emulator':
willStop = false
timer = setTimeout(work, 100)
break
default:
willStop = true
timer = setTimeout(stop, 100)
break
}
}
else {
willStop = true
stop()
}
}
如果 device.type 正常的话就会调用 work(),需要注意的是如果此时设备状态有变化或者断开的话 check() 是会被多次调用(setTimeout & clearTimeout)。
然后是 work()
function work() {
return (worker = workers[device.id] = spawn())
.then(function() {
log.info('Device worker "%s" has retired', device.id)
...
})
.catch(procutil.ExitError, function(err) {
...
})
}
该方法主要是调用 spawn(),并保存一个对应 deviceId 的 Promise 对象到 workers 进行管理,这里涉及到 lifecycle 工具类,该类主要的作用是在当前 node 进程结束的时候做些必要的收尾工作。
再然后是 spawn()
function spawn() {
var allocatedPorts = ports.splice(0, 4)
var proc = options.fork(device, allocatedPorts.slice())
var resolver = Promise.defer()
var didExit = false
...
}
开始说 spawn() 之前不得不先说一下 STF 各个模块启动的方式,我们来先大致看一下启动 provider 进程的源码 (/stf/lib/cli/provider)。
重点看这 2 个地方
return yargs
.strict()
.env('STF_PROVIDER')
.option('adb-host', {
describe: 'The ADB server host.'
, type: 'string'
, default: '127.0.0.1'
})
.option('adb-port', {
describe: 'The ADB server port.'
, type: 'number'
, default: 5037
})
...
STF 使用 yargs 包装每个模块的启动命令,此处是 provider 模块的启动参数定义,简单的看几个常用参数:
adb-host & adb-port:就是上面说到的 adb server 的访问地址。
allow-remote:是否允许通过远程连接 adb 方式的设备连接 STF。
max-port & min-port:定义一个 port 区间,默认是 7700 ~ 7400,每个 device 进程会分配到 4 个连续的 port。
其他的一些参数也是非常有用的,大家可以参考代码中的 describe 了解其用法。
fork 方法
fork: function(device, ports) {
var fork = require('child_process').fork
var args = [
'device'
, '--serial', device.id
, '--provider', argv.name
, '--screen-port', ports.shift()
, '--connect-port', ports.shift()
, '--vnc-port', ports.shift()
, '--public-ip', argv.publicIp
, '--group-timeout', argv.groupTimeout
, '--storage-url', argv.storageUrl
, '--adb-host', argv.adbHost
, '--adb-port', argv.adbPort
, '--screen-jpeg-quality', argv.screenJpegQuality
, '--screen-ping-interval', argv.screenPingInterval
, '--screen-ws-url-pattern', argv.screenWsUrlPattern
, '--connect-url-pattern', argv.connectUrlPattern
, '--heartbeat-interval', argv.heartbeatInterval
, '--boot-complete-timeout', argv.bootCompleteTimeout
, '--vnc-initial-size', argv.vncInitialSize.join('x')
, '--mute-master', argv.muteMaster
]
...
return fork(cli, args)
}
此方法就是 device 进程产生的核心方法了,在 provider 进程的 spawn() 中调用,通过 child_process 启动 device 子进程。
然后我们可以回到 provider 的 spawn() 了,现在再来看就容易理解了,spawn 主要就是调用 provider 启动参数中传入的 fork 方法了,fork 方法返回一个子进程对象,接下来就是设置与 device 子进程之间的消息监听了。
proc.on('exit', exitListener)
proc.on('error', errorListener)
proc.on('message', messageListener)
具体 Listener 的代码就不再贴出了。
device 进程入口源码 (/stf/units/device/index.js)
if (process.send) {
// Only if we have a parent process
process.send('ready')
}
log.info('Fully operational')
device 进程启动成功之后会向父进程发送一个 message,内容是'ready',表示 device 进程启动成功了,至此整个设备连接就已经完成了。
最后来我们简单的看一下 STF 是如何与设备进行通信的。先介绍一下 device 进程,其源码位于/stf/lib/units/device,device 进程启动时会使用 syrup 顺序加载 plugin,syrup 是一个 STF 自带的依赖注入框架,plugins 就是设备上相应的各个功能了,比如:InstallAPK、Logcat、Shell 等等。总的来说 device 进程调用设备上的具体功能就是通过 minicap、minitouch、adb、STFService(APK) 来实现的。
好了,以上就是整个设备连接的大致过程,有些细节没有在文中提及,但是应该也不影响对此的理解,推荐大家也可以阅读一下这部分的源码,发现更多的精彩。
PS:为了方便大家跟随本篇文章的步骤学习源码,故将 STF 版本锁定在 3.2.0,大家可以自行上 github clone 项目。