近两天,Django 发布了 channels 2.1.2, 用于支持多通道的分组广播,笔者试了下,用它来与后台实时交互实现某些服务的状态监控 (当然,这个最典型的应用还是 web 聊天室),以下记录发布到 PRODUCT 环境时的关键配置.
Nginx, redis, python3.6+ 及其依赖包如下版本所示
注意:channels 安装完毕后,Daphne 已经被附带安装成功,但是不能直接执行 daphne,需要自己创建连接.
Django 2.0.7, uWSGI 2.0.17.1,redis 3.2.10, channels 2.1.2, chanels-redis 2.2.1, daphne 2.2.1
var statusSocket = new WebSocket('ws://' + window.location.host + ":8000" + '/ws/status/' + '{{ pro.pk }}' + '/');
statusSocket.onmessage = function (e) {
var data = JSON.parse(e.data);
var message = data['message'];
# 收到消息后的处理逻辑
console.log(message);
};
app.routing.py:
from django.conf.urls import url
from . import consumers
websocket_urlpatterns = [
url(r'^ws/status/(?P<project_pk>[^/]+)/$', consumers.ServiceConsumer),
]
app.consumers.py:
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class ServiceConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.project_pk = self.scope['url_route']['kwargs']['project_pk']
self.room_group_name = 'status_%s' % self.project_pk
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
# Receive message from WebSocket
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
# Send message to room group
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'status_message',
'message': message
}
)
# Receive message from room group
async def status_message(self, event):
# todo: get real status from service detection api
message = event['message']
# Send message to WebSocket
await self.send(text_data=json.dumps({
'message': message
}))
project.routing.py:
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter
import autoplatform.routing
from autoplatform import consumers
application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
autoplatform.routing.websocket_urlpatterns
)
),
"channel": ChannelNameRouter({
"service-detection": consumers.ServiceConsumer,
}),
})
project.asgi.py:
import os
import django
from channels.routing import get_default_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
django.setup()
application = get_default_application()
project.setting.py:
……
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {"hosts": [('127.0.0.1', 6379)],},},}
……
INSTALLED_APPS = [
……
'channels',
]
……
ASGI_APPLICATION = "project.routing.application"
启动 redis 服务器 (采用默认配置即可):/usr/bin/redis-server 127.0.0.1:6379
启动 uwsgi(uwsgi 配置如下): uwsgi --ini /etc/uwsgi.ini
[uwsgi]
daemonize = /var/log/uwsgi.log
logto = /var/log/uswgi/error.log
socket = :10222
chdir = /root/autoplatform
module = project.wsgi
master = true
process = 4
vacuum = true
注意:上面的 chdir = .../autoplatform 指的是项目的根目录,也即 manage.py 文件所在的目录
启动 nginx: nginx -s reload
启动 daphne: daphne -b 0.0.0.0 -p 8000 project.asgi:application -v2
注意:这里的 8000 端口就指的是 websocket 的通信端口,需要开启防火墙放过,跟前端 JS websocket 一致
启动 worker: python3 manage.py runworker channels service-detection
注意:这里的service-detection
指的是project.routing.py
里的 ChannelNameRouter 里面的某个 router key,参见 project.routing.py
在以上代码的基础上,我们实现了前端某个 group 下的任何一个 client 发送消息给 server,server 都会推送最新的消息给所有该 group 下所有的 clients,这种说到底还是触发式的。假如我们需要不断的每隔一段时间就主动推一个新的消息给前端呢?这个时候我们就可以用上 BeatServer 了 (心跳,确认对方依然存活)。在以上代码的基础上,consumers.py 里面新增一个 BeatServer 类:
class BeatServer(SyncConsumer):
def test_print(self, message):
self.room_group_name = "status_2"
print(self.room_group_name)
async_to_sync(self.channel_layer.group_send)(self.room_group_name, {
"type": "status_message",
"message": "Update"
})
print(message)
在project.routing.py
的同目录下新增beatconfig.py
:
from datetime import timedelta
BEAT_SCHEDULE = {
'testing-print': {
'type': 'test.print',
'message': {'message': 'UUUU'},
'schedule': timedelta(seconds=5)
},
}
修改routing.py
:
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter
import autoplatform.routing
from autoplatform import consumers
application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
autoplatform.routing.websocket_urlpatterns
)
),
"channel": ChannelNameRouter({
"service-detection": consumers.ServiceConsumer,
# 新增router
"testing-print": consumers.BeatServer
}),
})
然后,需要单独启动这个 BeatServer:python3 manage.py beatserver
.
参考文献: