triproxy 其实是一个三端的消息转发器,它包含三个端口,pull、pub 和 dealer,分别对应 zmq 的三种工作模式。triproxy 模块最大的用途是模块的解耦。
pull 端用来拉取 websocket 模块或者 provider 模块的消息,之所以用 pull 的模式是因为消息从 websocket 或者 provider 到 triproxy 的消息传递是单向的,而 push-pull 模式可以保证消息传递的可靠性,就是说,如果 pull 端断开,push 端的消息仍然会被保留而不会丢失,直到 pull 把消息拉走,例如,provider 发现一台手机的在线状态发生了改变,就是通过 push 的方式推给 triproxy。
而 pub 端则用来广播消息,广播的特点就是一个发送端可以对应多个接收端,这些接收端可以同时接收到消息,例如设备状态的改变需要广播给所有的 websocket 模块,进而广播给所有的在线用户。当然广播的另外一个特点是如果广播时客户端不在线,那么这条消息就永远收不到了,因此,广播的消息也可以说是不太重要的消息,比如说如果 provider 的一台手机没有收到用户占用的消息,最多是用户占用失败,而不会造成太大的影响,而 provider 占用成功的消息如果用广播的方式就麻烦了,用户没收到占用成功的消息,会继续做占用的操作而不会成功,除非手机超时自动取消占用。
triproxy 和 processor 的通信是双向,因此用了 dealer 模式,当然 dealer 模式还有其他的优势,暂时我还没看出来。dealer 模式虽然可以一对多,但是一条消息只能唯一的传给一个接收方,就是说如果做了 processor 扩展,存在多个 processor 的情况下,同样的消息只会被一个 processor 处理,这点儿需要注意。
在 STF 中启动了两个 triproxy:tf-triproxy-app 和 stf-triproxy-dev,如果仔细看启动脚本,就会发现这两个 triproxy 除了名字和端口号不一样外其他都一样。
stf triproxy app \
--bind-pub "tcp://*:7150" \
--bind-dealer "tcp://*:7160" \
--bind-pull "tcp://*:7170"
这里感觉没什么好说的,就是指定了三个端口而已。
triproxy 的源码在 lib/units/triproxy/文件夹当中,会发现源码几乎是最少的,事实上一个转发的功能确实不需要太多的代码。
log 就不说了,先看 proxy 方法:
function proxy(to) {
return function() {
to.send([].slice.call(arguments))
}
}
那个 to.send 也是看得的一脸懵 B,至今不知道什么意思,反正就是把一个端口的东西转发到另一个端口,有哪位 node 大神解读一下这篇文章:对 [].slice.call(arguments,1) 的一丢丢见解?
后面两句关键代码就结束了,其他可以无视:
dealer.on('message', proxy(pub))
pull.on('message', proxy(dealer))
意思就是说 dealer 端收到消息以后转发到 pub 端,pull 端收到消息后转发到 dealer 端,感觉也没什么意思。
triproxy 这么好玩的东西当然要实践一下,验证一下前面的功能。
首先建立 push.js、subber.js、dealer.js 三个文件,代码分别是
// push.js
var zmq = require('zeromq')
, sock = zmq.socket('push');
sock.connect('tcp://127.0.0.1:7170');
console.log('Push connect to port 7170');
var count = 0
setInterval(function(){
//console.log('push message ' + count++);
sock.send('push message ' + count++);
}, 1000);
// subber.js
var zmq = require('zeromq')
, sock = zmq.socket('sub');
sock.connect('tcp://127.0.0.1:7150');
sock.subscribe('');
console.log('Subscriber connected to port 7150');
sock.on('message', function(topic, message) {
console.log('received:', message.toString());
});
// dealer.js
var zmq = require('zeromq')
, sock = zmq.socket('dealer');
sock.connect('tcp://127.0.0.1:7160');
console.log('Dealer connected to port 7160');
sock.on('message', function(message) {
console.log('received:', message.toString());
});
var count = 0
setInterval(function(){
//console.log('dealer message ' + count++);
sock.send(['1', 'dealer message ' + count++]);
}, 1000);
然后启动一个 triproxy
stf triproxy app --bind-pub "tcp://*:7150" --bind-dealer "tcp://*:7160" --bind-pull "tcp://*:7170"
然后一起运行三个脚本,就会发现,dealer 能收到 push 的消息,而 subber 能收到 dealer 的消息。
再验证一下前面的 dealer 的特性,同时启动两个 dealer,你会发现同样的消息只会有一个 dealer 接收到。
而启动多个 subber,每个 subber 都会接收到到同样的消息。
如果有兴趣做 STF 的研究,可以看我的资料加我联系方式共同交流