自动化工具 Django 和 Channels 实现 websocket 式实时通讯 - 生产环境部署

非洲赵子龙 for DOTA · July 25, 2018 · 9428 hits

近两天,Django发布了channels 2.1.2, 用于支持多通道的分组广播,笔者试了下,用它来与后台实时交互实现某些服务的状态监控(当然,这个最典型的应用还是web聊天室),以下记录发布到PRODUCT环境时的关键配置.

  • 安装

Nginx, redis, python3.6+及其依赖包如下版本所示

注意:channels安装完毕后,Daphne已经被附带安装成功,但是不能直接执行daphne,需要自己创建连接.

  • 版本

Django 2.0.7, uWSGI 2.0.17.1,redis 3.2.10, channels 2.1.2, chanels-redis 2.2.1, daphne 2.2.1

  • 前端
var statusSocket = new WebSocket('ws://' + window.location.host + ":8000" + '/ws/status/' + '{{ pro.pk }}' + '/');
statusSocket.onmessage = function (e) {
var data = JSON.parse(e.data);
var message = data['message'];
# 收到消息后的处理逻辑
console.log(message);
};
  • 后端

app.routing.py:

from django.conf.urls import url
from . import consumers
websocket_urlpatterns = [
url(r'^ws/status/(?P<project_pk>[^/]+)/$', consumers.ServiceConsumer),
]

app.consumers.py:

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class ServiceConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.project_pk = self.scope['url_route']['kwargs']['project_pk']
self.room_group_name = 'status_%s' % self.project_pk
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()


async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)

# Receive message from WebSocket
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']

# Send message to room group
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'status_message',
'message': message
}
)

# Receive message from room group
async def status_message(self, event):
# todo: get real status from service detection api
message = event['message']

# Send message to WebSocket
await self.send(text_data=json.dumps({
'message': message
}))

project.routing.py:

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter
import autoplatform.routing
from autoplatform import consumers

application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
autoplatform.routing.websocket_urlpatterns
)
),
"channel": ChannelNameRouter({
"service-detection": consumers.ServiceConsumer,
}),
})

project.asgi.py:

import os
import django
from channels.routing import get_default_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
django.setup()
application = get_default_application()

project.setting.py:

……
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {"hosts": [('127.0.0.1', 6379)],},},}
……
INSTALLED_APPS = [
……
'channels',
]
……
ASGI_APPLICATION = "project.routing.application"
  • 启动

启动redis服务器(采用默认配置即可):/usr/bin/redis-server 127.0.0.1:6379

启动uwsgi(uwsgi配置如下): uwsgi --ini /etc/uwsgi.ini

[uwsgi]
daemonize = /var/log/uwsgi.log
logto = /var/log/uswgi/error.log
socket = :10222
chdir = /root/autoplatform
module = project.wsgi
master = true
process = 4
vacuum = true

注意:上面的chdir = .../autoplatform指的是项目的根目录,也即manage.py文件所在的目录

启动nginx: nginx -s reload

启动daphne: daphne -b 0.0.0.0 -p 8000 project.asgi:application -v2

注意:这里的8000端口就指的是websocket的通信端口,需要开启防火墙放过,跟前端JS websocket一致

启动worker: python3 manage.py runworker channels service-detection

注意:这里的service-detection指的是project.routing.py里的ChannelNameRouter里面的某个router key,参见project.routing.py

  • 后台循环发送消息给前端

在以上代码的基础上,我们实现了前端某个group下的任何一个client发送消息给server,server都会推送最新的消息给所有该group下所有的clients,这种说到底还是触发式的。假如我们需要不断的每隔一段时间就主动推一个新的消息给前端呢?这个时候我们就可以用上BeatServer了(心跳,确认对方依然存活)。在以上代码的基础上,consumers.py里面新增一个BeatServer类:

class BeatServer(SyncConsumer):
def test_print(self, message):
self.room_group_name = "status_2"
print(self.room_group_name)
async_to_sync(self.channel_layer.group_send)(self.room_group_name, {
"type": "status_message",
"message": "Update"
})
print(message)

project.routing.py的同目录下新增beatconfig.py:

from datetime import timedelta

BEAT_SCHEDULE = {
'testing-print': {
'type': 'test.print',
'message': {'message': 'UUUU'},
'schedule': timedelta(seconds=5)
},
}

修改routing.py:

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter
import autoplatform.routing
from autoplatform import consumers

application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
autoplatform.routing.websocket_urlpatterns
)
),
"channel": ChannelNameRouter({
"service-detection": consumers.ServiceConsumer,
# 新增router
"testing-print": consumers.BeatServer
}),
})

然后,需要单独启动这个BeatServer:python3 manage.py beatserver.

参考文献:

  1. Github beatserver
  2. Github channels
No Reply at the moment.
需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up