STF openstf 模块解读--triproxy

blueshark · May 07, 2017 · Last by wocaishishimao replied at January 22, 2019 · 2123 hits

一、triproxy模块介绍

triproxy其实是一个三端的消息转发器,它包含三个端口,pull、pub和dealer,分别对应zmq的三种工作模式。triproxy模块最大的用途是模块的解耦。

  • pull端用来拉取websocket模块或者provider模块的消息,之所以用pull的模式是因为消息从websocket或者provider到triproxy的消息传递是单向的,而push-pull模式可以保证消息传递的可靠性,就是说,如果pull端断开,push端的消息仍然会被保留而不会丢失,直到pull把消息拉走,例如,provider发现一台手机的在线状态发生了改变,就是通过push的方式推给triproxy。

  • 而pub端则用来广播消息,广播的特点就是一个发送端可以对应多个接收端,这些接收端可以同时接收到消息,例如设备状态的改变需要广播给所有的websocket模块,进而广播给所有的在线用户。当然广播的另外一个特点是如果广播时客户端不在线,那么这条消息就永远收不到了,因此,广播的消息也可以说是不太重要的消息,比如说如果provider的一台手机没有收到用户占用的消息,最多是用户占用失败,而不会造成太大的影响,而provider占用成功的消息如果用广播的方式就麻烦了,用户没收到占用成功的消息,会继续做占用的操作而不会成功,除非手机超时自动取消占用。

  • triproxy和processor的通信是双向,因此用了dealer模式,当然dealer模式还有其他的优势,暂时我还没看出来。dealer模式虽然可以一对多,但是一条消息只能唯一的传给一个接收方,就是说如果做了processor扩展,存在多个processor的情况下,同样的消息只会被一个processor处理,这点儿需要注意。

二、triproxy的启动方式

在STF中启动了两个triproxy:tf-triproxy-app和stf-triproxy-dev,如果仔细看启动脚本,就会发现这两个triproxy除了名字和端口号不一样外其他都一样。

stf triproxy app \
--bind-pub "tcp://*:7150" \
--bind-dealer "tcp://*:7160" \
--bind-pull "tcp://*:7170"

这里感觉没什么好说的,就是指定了三个端口而已。

三、triproxy源码解读

triproxy的源码在lib/units/triproxy/文件夹当中,会发现源码几乎是最少的,事实上一个转发的功能确实不需要太多的代码。

log就不说了,先看proxy方法:

function proxy(to) {
return function() {
to.send([].slice.call(arguments))
}
}

那个to.send也是看得的一脸懵B,至今不知道什么意思,反正就是把一个端口的东西转发到另一个端口,有哪位node大神解读一下这篇文章:对[].slice.call(arguments,1) 的一丢丢见解?

后面两句关键代码就结束了,其他可以无视:

dealer.on('message', proxy(pub))

pull.on('message', proxy(dealer))

意思就是说dealer端收到消息以后转发到pub端,pull端收到消息后转发到dealer端,感觉也没什么意思。

四、triproxy应用实践

triproxy这么好玩的东西当然要实践一下,验证一下前面的功能。

首先建立push.js、subber.js、dealer.js三个文件,代码分别是

// push.js 
var zmq = require('zeromq')
, sock = zmq.socket('push');

sock.connect('tcp://127.0.0.1:7170');
console.log('Push connect to port 7170');

var count = 0
setInterval(function(){
//console.log('push message ' + count++);
sock.send('push message ' + count++);
}, 1000);
// subber.js 
var zmq = require('zeromq')
, sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:7150');
sock.subscribe('');
console.log('Subscriber connected to port 7150');

sock.on('message', function(topic, message) {
console.log('received:', message.toString());
});
// dealer.js 
var zmq = require('zeromq')
, sock = zmq.socket('dealer');

sock.connect('tcp://127.0.0.1:7160');
console.log('Dealer connected to port 7160');

sock.on('message', function(message) {
console.log('received:', message.toString());
});

var count = 0
setInterval(function(){
//console.log('dealer message ' + count++);
sock.send(['1', 'dealer message ' + count++]);
}, 1000);

然后启动一个triproxy

stf triproxy app  --bind-pub "tcp://*:7150"  --bind-dealer "tcp://*:7160"  --bind-pull "tcp://*:7170"

然后一起运行三个脚本,就会发现,dealer能收到push的消息,而subber能收到dealer的消息。

再验证一下前面的dealer的特性,同时启动两个dealer,你会发现同样的消息只会有一个dealer接收到。

而启动多个subber,每个subber都会接收到到同样的消息。

如果有兴趣做STF的研究,可以看我的资料加我联系方式共同交流😃

共收到 3 条回复 时间 点赞
blueshark 关闭了讨论 07 May 22:56
blueshark 重新开启了讨论 07 May 22:57

app triproxy和dev triporxy除了名字不一样,绑定的端口也不一样啊!😄 😄 😄

blueshark #4 · May 08, 2017 作者
willys 回复

端口被我无视了,哈哈~~~

[]是数组,数组有slice方法,其实就是写成arguments.slice()一样的,返回一个自己的拷贝出去,免得后面修改导致引用的变量被改变

需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up