STF STF 系列之二---minitouch 流程源码分析

shuta · 2017年06月04日 · 最后由 ElanCao 回复于 2019年07月29日 · 4643 次阅读

上一篇文章分析 STF 如何同步设备截图的,这一篇分析用户在前端页面 touch 设备 A 时,STF 如何将 touch 动作同步在设备 A?本文将从前端,服务端,手机 三个方向说明。

touch 流程

touch 的整个流程如下所示:

其中 triproxy/index.js 从 pull 到 pub 用虚线标识出来原因是,中间省略了一系列过程。这些过程会在服务端部分详细说明。

前端

当用户 touch 设备时,前端如何捕捉用户动作?捕捉到用户动作之后,是如何将数据传输至服务端?

捕捉用户动作-- screen-directive.js

这里说明一下,下文中所有设备详情页指的是http://localhost:7100/#!/control/${serial}页面,

设备详情页中看到设备截图的那个部分,如下图所示

是由 screen-directive.js 文件管理的。screen-directive.js 中除了之前说的渲染图片二进制文件功能,还可以捕捉用户点击,滑动 “设备屏幕” 等事件,并调用 ControlService.js 文件中对应的函数。代码如下所示

element.on('mousedown', mouseDownListener)  //当用户触发mousedown事件后,调用mouseDownListener函数
function mouseDownListener(){
   ...
   ...
   control.touchDown(nextSeq(), 0, scaled.xP, scaled.yP, pressure)
   ...
   $document.bind('mouseup', mouseUpListener)
}

当用户在前端点击设备上某个位置时,screen-directive.js 捕捉了 touchDown 动作,调用的是 mouseDownListener 函数, mouseDownListener 会通过$scope.control(即代码中的 control) 调用 ControlService.js 的中 touchDown 函数,向服务端发送消息 -- 点击该台设备的 (x,y) 位置。

至于为什么在 control-panes-controller.js 中定义的$scope.control, 能在 screen-directive.js 使用?这就属于 angular 1.x 知识,感兴趣的同学可以自己去了解,这里不赘述。

传输数据至服务端 -- control-panes-controller.js

该 js 文件完成$scope.device,和$scope.control的初始化

...
function getDevice(serial) {
      DeviceService.get(serial, $scope)
        .then(function(device) {
          return GroupService.invite(device)
        })
        .then(function(device) {
          $scope.device = device
          $scope.control = ControlService.create(device, device.channel)

          // TODO: Change title, flickers too much on Chrome
          // $rootScope.pageTitle = device.name

          SettingsService.set('lastUsedDevice', serial)

          return device
        })
        .catch(function() {
          $timeout(function() {
            $location.path('/')
          })
        })
    }
    ...

其中:

$scope.control = ControlService.create(device, device.channel)

我们来分析 ControlService.js 文件

function sendOneWay(action, data) {
      socket.emit(action, channel, data)
    }

    ...
    ...
    this.touchDown = function(seq, contact, x, y, pressure) {
      sendOneWay('input.touchDown', {
        seq: seq
      , contact: contact
      , x: x
      , y: y
      , pressure: pressure
      })
    }
    ...

ControlService.js 中封装了一系列操作设备的函数:如: this.touchDown。但这些操作设备函数都调用了 sendOneWay 方法。sendOneWay 方法调用了 socket.emit(),而这个 socket 来自 socket-service.js 文件。这个文件主要内容如下所示

var io = require('socket.io')
var websocketUrl = AppState.config.websocketUrl || ''

var socket = io(websocketUrl, {
reconnection: false, transports: ['websocket']
})
...

这段代码创建了 websocket client(websocket 在之前的文章已经描述过了,这里不再赘述)。ControlService.js 文件使用这个 socket 向服务端发送消息。

总结写一下,this.touchDown函数,通过socket.emit向服务端发送'input.touchDown'消息,并传输该设备的 channel 和按压点的 x,y 坐标。

需要注意的是,设备 A 详情页和设备 B 详情页,var websocketUrl = AppState.config.websocketUrl || ''不同 (端口不同)。

服务端

准备工作

请先仔细阅读zeromq 官方文档zeromq 其他参考资料。着重看 push/pull,pub/sub 部分。

服务端流程

服务端是如何接收前端发送的数据,并将数据传输至对应手机呢?这里以单个设备举例。

服务端处理信息流转有三个文件,websocket/index.js, triproxy/index.js, processor/index.js 。最后接收数据和 minitouch 通信是来自 device 的 sub.js,touch/index.js

在执行stf local命令时,发现 STF 启动了两个 triproxy 的实例 app001 和 dev001

INF/util:procutil 40934 [*] Forking "/Users/sheranjun/Code/stf/lib/cli.js triproxy app001 --bind-pub tcp://127.0.0.1:7111 --bind-dealer tcp://127.0.0.1:7112 --bind-pull tcp://127.0.0.1:7113"
INF/util:procutil 40934 [*] Forking "/Users/sheranjun/Code/stf/lib/cli.js triproxy dev001 --bind-pub tcp://127.0.0.1:7114 --bind-dealer tcp://127.0.0.1:7115 --bind-pull tcp://127.0.0.1:7116"

其中 app001 是处理从前端 UI push 的消息;dev001 是处理从手机 push 的消息。这两个实例一起完成了前端 UI 和手机的双向流通。当然本文只分析从前端 UI 发送消息到手机接收消息单方向的流程

websocket/index.js

index.js 做了两件事,启动 websocket 服务端,等待接收 websocket 客户端消息;启动 app001 push,发送消息至 app001 pull

  • websocket server 初始化
var socketio = require('socket.io')

var io = socketio.listen(server, {
    serveClient: false
  , transports: ['websocket']
  })

  ....
  server.listen(options.port)
  log.info('Listening on port %d', options.port)

设备 A 详情页和设备 B 详情页启动的是不同的 websocket server.

  • app001 push 初始化
var push = zmqutil.socket('push')
  log.info("endopoints  is " + options.endpoints)
  log.info("endopoints  is push  " + options.endpoints.push)
  Promise.map(options.endpoints.push, function(endpoint) {
    return srv.resolve(endpoint).then(function(records) {
      return srv.attempt(records, function(record) {
        log.info('Sending output to "%s"', record.url)
        push.connect(record.url)
        return Promise.resolve(true)
      })
    })
  })
  .catch(function(err) {
    log.fatal('Unable to connect to push endpoint', err)
    lifecycle.fatal()
  })

其中push.connect(record.url), record.url是 app001 pull 服务的 URL。是唯一的,所有的设备详情页使用的是同一个 app001 pull 服务的 URL。

  • 什么时候 push 呢?
io.on('connection', function(socket) {
    socket.on('input.touchDown', function(channel, data){
    log.info("touch down from websocket")
          push.send([
            channel
          , wireutil.envelope(new wire.TouchDownMessage(
              data.seq
            , data.contact
            , data.x
            , data.y
            , data.pressure
            ))
          ])
    })
}

这段语句,将 websocket 和 push 联系起来,先来看看前端发送的input.touchDown消息,传输的数据

function sendOneWay(action, data) {
      socket.emit(action, channel, data)
    }

    ...
    ...
    this.touchDown = function(seq, contact, x, y, pressure) {
      sendOneWay('input.touchDown', {
        seq: seq
      , contact: contact
      , x: x
      , y: y
      , pressure: pressure
      })
    }
    ...

两端代码联系起来,前端发送input.touchDown消息,后端成功接收input.touchDown消息之后,调用 app001 的 push 端,将 channel 和封装了 data(封装方式这里不重要,不讨论),发送至 app 001 的 pull 端。
同时可以知道,尽管设备 A 和设备 B 详情页使用的不同的 websocket,但它们使用的 push 端相同。

triproxy/index.js

triproxy 中的 index.js,主要是新建 pull 端,dealer 端,以及 pub 端。代码如下所示

function proxy(to) {
  return function() {
    to.send([].slice.call(arguments))
  }
}

// App/device output
var pub = zmqutil.socket('pub')
pub.bindSync(options.endpoints.pub)
log.info('PUB socket bound on', options.endpoints.pub)

// Coordinator input/output
var dealer = zmqutil.socket('dealer')
dealer.bindSync(options.endpoints.dealer)
dealer.on('message', proxy(pub))
log.info('DEALER socket bound on', options.endpoints.dealer)

// App/device input
var pull = zmqutil.socket('pull')
pull.bindSync(options.endpoints.pull)
pull.on('message', proxy(dealer))
log.info('PULL socket bound on', options.endpoints.pull)

在启动 STF 时,新建了两个 triproxy:app001,dev001。这里拿 app001 triproxy 来举例,

  • pull.on('message', proxy(dealer)),意味着 pull 端接受来自 websocket/index.js push 端的 channel/data 后,将 channel/data 通过 dealer 服务端发送出去。
  • dealer.on('message', proxy(pub)),意思是 dealer 服务端接收到 channel/data 后,将 channel/data 通过 pub 端发送出去。

前面提到过,STF 新建了两个 triproxy:app001 和 dev 001。所以,websocket/index.js 中的 channel/data 通过 app001 push 到了 app001 的 pull 端。app001pull 端,将 channel/data 从 app001 dealer 服务端发送出去,那 channel/data 又在哪里被接收了呢?

processor/index.js

processor 中的 index.js,主要是为了 app001 dealer client 和 dev001 dealear client 之间的消息转发。代码如下

// App side
 var appDealer = zmqutil.socket('dealer')
 Promise.map(options.endpoints.appDealer, function(endpoint) {
   return srv.resolve(endpoint).then(function(records) {
     return srv.attempt(records, function(record) {
       log.info('App dealer connected to "%s"', record.url)
       appDealer.connect(record.url)
       return Promise.resolve(true)
     })
   })
 })
 .catch(function(err) {
   log.fatal('Unable to connect to app dealer endpoint', err)
   lifecycle.fatal()
 })

 // Device side
 var devDealer = zmqutil.socket('dealer')

 appDealer.on('message', function(channel, data) {
   devDealer.send([channel, data])
 })

 Promise.map(options.endpoints.devDealer, function(endpoint) {
   return srv.resolve(endpoint).then(function(records) {
     return srv.attempt(records, function(record) {
       log.info('Device dealer connected to "%s"', record.url)
       devDealer.connect(record.url)
       return Promise.resolve(true)
     })
   })
 })
 .catch(function(err) {
   log.fatal('Unable to connect to dev dealer endpoint', err)
   lifecycle.fatal()
 })

processor 中的 index.js 中新建了两个 dealer client:appDealer client 和 devDealer client。分别连接 triproxy 中,app001 dealer 服务端和 dev001 dealer 服务端。其中

appDealer.on('message', function(channel, data) {
    devDealer.send([channel, data])
  })

appDealer client 接收到来自 app001 dealer 服务端的 channel/data 之后,将 channel/data 传递给 devDealer client,devDealer client 发送至 dev001 dealer 服务端。再联系triproxy/index.js一小节的流程,dev001 的 dealer 服务端接收到 channel/data 后,将 channel/data 通过 dev001 的 pub 端发送出去。

至此服务端 channel/data 流转前半截就将完了。接下来将 dev001 的 pub 端的 channel/data 流向哪里了?

lib/units/device 下 sub.js,solo.js,以及 touch/index.js

每一个手机都会创造一个 device 对象。device 对象中包含很多功能。和本篇文章相关的是两个功能,一是 dev001 sub 端并订阅固定 channel。这个功能由 sub.js 和 solo.js 两个文件完成;二是创造一个 touch 对象,STF 用它来向手机发送指令,这个功能由 touch/index.js 文件完成。如下图所示:

至于 device 对象何时创造?如何被创建?这个流程和 lib/cli.js,/lib/provider/index.js 文件有关。关键词是fork 以及.command('device <serial>'),有兴趣自己研究。

  • sub.js

    为每个手机创建连接到 dev001 pub 端的 sub 端。代码:

    var sub = zmqutil.socket('sub')
    
    return Promise.map(options.endpoints.sub, function(endpoint) {
        return srv.resolve(endpoint).then(function(records) {
          return srv.attempt(records, function(record) {
            log.info('Receiving input from "%s"', record.url)
            sub.connect(record.url)
            return Promise.resolve(true)
          })
        })
      })
      .then(function() {
        // Establish always-on channels
        [wireutil.global].forEach(function(channel) {
          log.info('Subscribing to permanent channel "%s"', channel)
          sub.subscribe(channel)
        })
      })
      .return(sub)
    

    这里的sub.connect(record.url),record.url是 dev001 pub 端,这一点可以通过 stf 启动日志验证。

    STF 日志中 dev001 pub 端口地址如下所示:

    INF/util:procutil 47526 [*] Forking "/Users/****/Code/stf/lib/cli.js triproxy dev001 --bind-pub tcp://127.0.0.1:7114 --bind-dealer tcp://127.0.0.1:7115 --bind-pull tcp://127.0.0.1:7116"
    

    而 STF 日志中 sub.js 打印出的日志:

    INF/device:support:sub 48054 [63a5b447] Receiving input from "tcp://127.0.0.1:7114"
    
  • solo.js

    solo.js,为 sub.js 中创建的 sub 端,订阅该手机的固定频道。代码如下

    function makeChannelId() {
      var hash = crypto.createHash('sha1')
      hash.update(options.serial)
      return hash.digest('base64')
    }
    
    var channel = makeChannelId()
    
    log.info('Subscribing to permanent channel "%s"', channel)
    sub.subscribe(channel)
    

    solo.js 通过手机的 serial 创建 channel。可能会有人问,怎么确定 dev001 pub 端发送的 channel(本质上来自前端手机 A 详情页传输的 channel),和该手机在 solo.js 中订阅的 channel 是同一个呢?这里简单解释一下。前端传递过来的 channel,也来自于后端。前端获取 channel 的代码在 device-service.js 文件中,代码如下所示:

    deviceService.load = function(serial) {
    return $http.get('/api/v1/devices/' + serial)
      .then(function(response) {
        return response.data.device
      })
    }
    

    请求的 api,最终执行的后端函数如下所示:

      function getDeviceBySerial(req, res) {
    var serial = req.swagger.params.serial.value
    var fields = req.swagger.params.fields.value
    
    dbapi.loadDevice(serial)
      .then(function(device) {
          ...
      }
    

    其中dbapi.loadDevice是操作数据库函数,而对 device 对象的一系列数据库操作中,只有setDeviceReady函数是更新了 device 的 channel,所以dbapi.loadDevice中 channel 数据来自于setDeviceReady函数。

      dbapi.setDeviceReady = function(serial, channel) {
    return db.run(r.table('devices').get(serial).update({
      channel: channel
    , ready: true
    , owner: null
    , reverseForwards: []
    }))
      }
    

    dbapi.setDeviceReady函数又在哪里被调用呢?是的,在 processor/index.js。

    devDealer.on('message', wirerouter()
    .on(wire.DeviceReadyMessage, function(channel, message, data) {
      dbapi.setDeviceReady(message.serial, message.channel)
        .then(function() {
          devDealer.send([
            message.channel
          , wireutil.envelope(new wire.ProbeMessage())
          ])
    
          appDealer.send([channel, data])
        })
    })
    

    而 processor/index.js 接收的 channel/message/data,来自 solo.js。至于 solo.js 的 push,如何将 channel/message/data 传输至 processor/index.js 的 devDealer client 中,按照上一小节的 triproxy.js 的流程图,对照 STF 日志,自己分析。这里不再赘述。

    return {
      channel: channel
    , poke: function() {
        push.send([
          wireutil.global
        , wireutil.envelope(new wire.DeviceReadyMessage(
            options.serial
          , channel
          ))
        ])
      }
    }
    

    所以,前端的 channel,也是来自于 solo.js,而且 channel 和 serial 有一一对应的关系。这就解决了来自前端手机 A 详情页的数据一定会发送到后端手机 A 的对应的 channel 上。

  • touch/index.js

    touch/index.js中做了两件事:一是启动手机的 minitouch 服务,而是订阅 dev001 pub 端的数据,并将固定频道 channel 数据传输至手机。每一个手机都会有一个 touch/index.js

    • 启动手机 minitouch 服务

      • 启动手机上的 minitouch 服务
      TouchConsumer.prototype._startService = function() {
            log.info('Launching screen service')
            return minitouch.run()
              .timeout(10000)
          }
      
      • 将手机端启动的 minitouch tcp 服务端端口映射到 pc 端。连接到该 pc 端口,返回 socket。该功能由adb.openLocal函数完成,代码如下所示
      TouchConsumer.prototype._connectService = function() {
            function tryConnect(times, delay) {
              return adb.openLocal(options.serial, 'localabstract:minitouch')
                .timeout(10000)
                .then(function(out) {
                  return out
                })
                .catch(function(err) {
                  if (/closed/.test(err.message) && times > 1) {
                    return Promise.delay(delay)
                      .then(function() {
                        return tryConnect(times - 1, delay * 2)
                      })
                  }
                  return Promise.reject(err)
                })
            }
            log.info('Connecting to minitouch service')
            // SH-03G can be very slow to start sometimes. Make sure we try long
            // enough.
            return tryConnect(7, 100)
          }
      

      有兴趣的,可以研究一下adb.openLocal函数,这里不再详细讨论该函数流程

    • 订阅 dev001 pub 端固定 channel

      touch/index.js通过引用 sub.js 和 solo.js,使用订阅固定频道的 dev001 sub,等待来自 dev001 pub 端数据,代码如下所示:

      router
          .on(wire.TouchDownMessage, function(channel, message) {
          log.info("touch down from toucb")
          queue.push(message.seq, function() {
            touchConsumer.touchDown(message)
          })
        })
      

      而 router 来自 router.js 中,router.js 引用了 sub.js

      module.exports = syrup.serial()
          .dependency(require('./sub'))
          .define(function(options, sub, channels) {
              var log = logger.createLogger('device:support:router')
              var router = wirerouter()
      
              sub.on('message', router.handler())
      
              return router
            })
      

      其中 dev001 push 端在 channelA 上发送数据后,订阅了 channelA 的 dev001 sub 端接收该数据。并调用touchConsumer.touchDown(message)。而 touchDown 函数的相关代码如下所示:

          TouchConsumer.prototype.touchDown = function(point) {
        log.info("touch down from touch index")
        this._queueWrite(function() {
          return this._write(util.format(
            'd %s %s %s %s\n'
          , point.contact
          , Math.floor(this.touchConfig.origin.x(point) * this.banner.maxX)
          , Math.floor(this.touchConfig.origin.y(point) * this.banner.maxY)
          , Math.floor((point.pressure || 0.5) * this.banner.maxPressure)
          ))
        })
      }
      
      TouchConsumer.prototype._write = function(chunk) {
        this.socket.stream.write(chunk)
      }
      
      

      touchDown函数中的this._write调用的this.socket来自this._connectService()返回值。this.socket代表着连接手机 minitouch tcp 服务端的 tcp 客户端。touchDown函数接收 touchDown 消息后,向手机发送以'd'开头的字符串命令。

    到此,服务端消息的流程已经全部解析完成。再看看最开始画的流程图,是不是清晰很多呢?

手机

手机上主要是启动了 minitouch 的 tcp 服务,接收 STF 服务端操作手机指令。并根据指令,操作手机它的代码如下所示

  • start_server 函数,启动 tcp 服务

    static int start_server(char* sockname)
    {
      int fd = socket(AF_UNIX, SOCK_STREAM, 0);
    
      if (fd < 0)
      {
        perror("creating socket");
        return fd;
      }
    
      struct sockaddr_un addr;
      memset(&addr, 0, sizeof(addr));
      addr.sun_family = AF_UNIX;
      strncpy(&addr.sun_path[1], sockname, strlen(sockname));
    
      if (bind(fd, (struct sockaddr*) &addr,
        sizeof(sa_family_t) + strlen(sockname) + 1) < 0)
      {
        perror("binding socket");
        close(fd);
        return -1;
      }
    
      listen(fd, 1);
    
      return fd;
    }
    

    fd既是创造 TCP 服务端

  • 等到 STF 服务端发送的指令,根据指令操作设备,这里以 touchDown 指令,举例说明

    int client_fd = accept(server_fd, (struct sockaddr *) &client_addr,
     &client_addr_length);
    
     .....
     while (io_length < sizeof(io_buffer) &&
       read(client_fd, &io_buffer[io_length], 1) == 1)
     {
       if (io_buffer[io_length++] == '\n')
       {
         break;
       }
     }
    
     ...
     switch (io_buffer[0])
     {
       case 'c': // COMMIT
         commit(&state);
         break;
       case 'r': // RESET
         touch_panic_reset_all(&state);
         break;
       case 'd': // TOUCH DOWN
         contact = strtol(cursor, &cursor, 10);
         x = strtol(cursor, &cursor, 10);
         y = strtol(cursor, &cursor, 10);
         pressure = strtol(cursor, &cursor, 10);
         touch_down(&state, contact, x, y, pressure);
         break;
         .....
      }
    

    其中client_fd接收来自 STF 服务端的指令,io_buffer存储 STF 服务端消息,并根据第一字符判断操作类型。若第一个字符为 d,调用touch_down函数。

后记

touch 动作源码分析,整理出来算是一个大的工程了。涉及的核心知识点是 zeromq,这里只分析了前端操作是如何同步到设备,逆向流程并没有涉及,有兴趣的可以自己了解。

由于项目需要改造 STF,阅读了 STF 的源码,但项目改造 STF 重点在 STF 的前端和 STF 认证。这两块没有涉及多少 STF 整体架构设计,更多的是 angular 1.x 框架的知识。对这两块有兴趣的,欢迎讨论。

共收到 10 条回复 时间 点赞
shuta 关闭了讨论 06月05日 19:20
shuta 重新开启了讨论 06月05日 19:20

厉害👍,将代码解析到这种程度,让人很容易理解 touch 及通讯的整个过程。受用了,多谢分享

我写完这篇帖子已经有 17 天了,你是第一个回复我的。感动。。。😂 😂 😂

没看先赞,LZ 加油。。

虽然知道 zmq 在 STF 中的地位很重要,但还是看 zmq 不爽

受教了,写的非常清晰

大神 minitouch 对 windows 的兼容不好,win7、win10 总报 socket 错误。这个你遇到过么?不知道是我的问题,还是 minitouch 的问题?

楼主好棒,这种原理解析的文章太好了,能读透源码的程序员真厉害

匿名 #10 · 2018年02月23日

谢谢分享,想问下对设备中的日志和界面数据的传回直到展示这块有没有研究,界面数据的传回和解析时 minicap 做的吗 stf 有做什么处理吗

感觉应该加精

仅楼主可见
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册