STF openstf 模块解读--triproxy

blueshark · 2017年05月07日 · 最后由 wocaishishimao 回复于 2019年01月22日 · 2064 次阅读

一、triproxy 模块介绍

triproxy 其实是一个三端的消息转发器,它包含三个端口,pull、pub 和 dealer,分别对应 zmq 的三种工作模式。triproxy 模块最大的用途是模块的解耦。

  • pull 端用来拉取 websocket 模块或者 provider 模块的消息,之所以用 pull 的模式是因为消息从 websocket 或者 provider 到 triproxy 的消息传递是单向的,而 push-pull 模式可以保证消息传递的可靠性,就是说,如果 pull 端断开,push 端的消息仍然会被保留而不会丢失,直到 pull 把消息拉走,例如,provider 发现一台手机的在线状态发生了改变,就是通过 push 的方式推给 triproxy。

  • 而 pub 端则用来广播消息,广播的特点就是一个发送端可以对应多个接收端,这些接收端可以同时接收到消息,例如设备状态的改变需要广播给所有的 websocket 模块,进而广播给所有的在线用户。当然广播的另外一个特点是如果广播时客户端不在线,那么这条消息就永远收不到了,因此,广播的消息也可以说是不太重要的消息,比如说如果 provider 的一台手机没有收到用户占用的消息,最多是用户占用失败,而不会造成太大的影响,而 provider 占用成功的消息如果用广播的方式就麻烦了,用户没收到占用成功的消息,会继续做占用的操作而不会成功,除非手机超时自动取消占用。

  • triproxy 和 processor 的通信是双向,因此用了 dealer 模式,当然 dealer 模式还有其他的优势,暂时我还没看出来。dealer 模式虽然可以一对多,但是一条消息只能唯一的传给一个接收方,就是说如果做了 processor 扩展,存在多个 processor 的情况下,同样的消息只会被一个 processor 处理,这点儿需要注意。

二、triproxy 的启动方式

在 STF 中启动了两个 triproxy:tf-triproxy-app 和 stf-triproxy-dev,如果仔细看启动脚本,就会发现这两个 triproxy 除了名字和端口号不一样外其他都一样。

stf triproxy app \
  --bind-pub "tcp://*:7150" \
  --bind-dealer "tcp://*:7160" \
  --bind-pull "tcp://*:7170"

这里感觉没什么好说的,就是指定了三个端口而已。

三、triproxy 源码解读

triproxy 的源码在 lib/units/triproxy/文件夹当中,会发现源码几乎是最少的,事实上一个转发的功能确实不需要太多的代码。

log 就不说了,先看 proxy 方法:

function proxy(to) {
  return function() {
    to.send([].slice.call(arguments))
  }
}

那个 to.send 也是看得的一脸懵 B,至今不知道什么意思,反正就是把一个端口的东西转发到另一个端口,有哪位 node 大神解读一下这篇文章:对 [].slice.call(arguments,1) 的一丢丢见解?

后面两句关键代码就结束了,其他可以无视:

dealer.on('message', proxy(pub))

pull.on('message', proxy(dealer))

意思就是说 dealer 端收到消息以后转发到 pub 端,pull 端收到消息后转发到 dealer 端,感觉也没什么意思。

四、triproxy 应用实践

triproxy 这么好玩的东西当然要实践一下,验证一下前面的功能。

首先建立 push.js、subber.js、dealer.js 三个文件,代码分别是

// push.js 
var zmq = require('zeromq')
  , sock = zmq.socket('push');

sock.connect('tcp://127.0.0.1:7170');
console.log('Push connect to port 7170');

var count = 0
setInterval(function(){
  //console.log('push message ' + count++);
  sock.send('push message ' + count++);
}, 1000);
// subber.js 
var zmq = require('zeromq')
  , sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:7150');
sock.subscribe('');
console.log('Subscriber connected to port 7150');

sock.on('message', function(topic, message) {
  console.log('received:', message.toString());
});
// dealer.js 
var zmq = require('zeromq')
  , sock = zmq.socket('dealer');

sock.connect('tcp://127.0.0.1:7160');
console.log('Dealer connected to port 7160');

sock.on('message', function(message) {
  console.log('received:', message.toString());
});

var count = 0
setInterval(function(){
  //console.log('dealer message ' + count++);
  sock.send(['1', 'dealer message ' + count++]);
}, 1000);

然后启动一个 triproxy

stf triproxy app  --bind-pub "tcp://*:7150"  --bind-dealer "tcp://*:7160"  --bind-pull "tcp://*:7170"

然后一起运行三个脚本,就会发现,dealer 能收到 push 的消息,而 subber 能收到 dealer 的消息。

再验证一下前面的 dealer 的特性,同时启动两个 dealer,你会发现同样的消息只会有一个 dealer 接收到。

而启动多个 subber,每个 subber 都会接收到到同样的消息。

如果有兴趣做 STF 的研究,可以看我的资料加我联系方式共同交流😃

共收到 3 条回复 时间 点赞
blueshark 关闭了讨论 05月07日 22:56
blueshark 重新开启了讨论 05月07日 22:57

app triproxy 和 dev triporxy 除了名字不一样,绑定的端口也不一样啊!😄 😄 😄

willys 回复

端口被我无视了,哈哈~~~

[] 是数组,数组有 slice 方法,其实就是写成 arguments.slice() 一样的,返回一个自己的拷贝出去,免得后面修改导致引用的变量被改变

需要 登录 後方可回應,如果你還沒有帳號按這裡 注册