自动化工具 Django 和 Channels 实现 websocket 式实时通讯 - 生产环境部署

非洲赵子龙 for DOTA · 2018年07月25日 · 2593 次阅读

近两天,Django 发布了 channels 2.1.2, 用于支持多通道的分组广播,笔者试了下,用它来与后台实时交互实现某些服务的状态监控 (当然,这个最典型的应用还是 web 聊天室),以下记录发布到 PRODUCT 环境时的关键配置.

  • 安装

Nginx, redis, python3.6+ 及其依赖包如下版本所示

注意:channels 安装完毕后,Daphne 已经被附带安装成功,但是不能直接执行 daphne,需要自己创建连接.

  • 版本

Django 2.0.7, uWSGI 2.0.17.1,redis 3.2.10, channels 2.1.2, chanels-redis 2.2.1, daphne 2.2.1

  • 前端
var statusSocket = new WebSocket('ws://' + window.location.host + ":8000" + '/ws/status/' + '{{ pro.pk }}' + '/');
statusSocket.onmessage = function (e) {
var data = JSON.parse(e.data);
var message = data['message'];
# 收到消息后的处理逻辑
 console.log(message);
 };
  • 后端

app.routing.py:

from django.conf.urls import url
from . import consumers
websocket_urlpatterns = [
url(r'^ws/status/(?P<project_pk>[^/]+)/$', consumers.ServiceConsumer),
]

app.consumers.py:

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class ServiceConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.project_pk = self.scope['url_route']['kwargs']['project_pk']
        self.room_group_name = 'status_%s' % self.project_pk
        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        await self.accept()


    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'status_message',
                'message': message
            }
        )

    # Receive message from room group
    async def status_message(self, event):
        # todo: get real status from service detection api
        message = event['message']

        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))

project.routing.py:

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter
import autoplatform.routing
from autoplatform import consumers

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AuthMiddlewareStack(
        URLRouter(
            autoplatform.routing.websocket_urlpatterns
        )
    ),
    "channel": ChannelNameRouter({
        "service-detection": consumers.ServiceConsumer,
    }),
})

project.asgi.py:

import os
import django
from channels.routing import get_default_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
django.setup()
application = get_default_application()

project.setting.py:

……
CHANNEL_LAYERS = {
    'default': {
    'BACKEND': 'channels_redis.core.RedisChannelLayer',
    'CONFIG': {"hosts": [('127.0.0.1', 6379)],},},}
    ……
INSTALLED_APPS = [
    ……
    'channels',
    ]
    ……
ASGI_APPLICATION = "project.routing.application"
  • 启动

启动 redis 服务器 (采用默认配置即可):/usr/bin/redis-server 127.0.0.1:6379

启动 uwsgi(uwsgi 配置如下): uwsgi --ini /etc/uwsgi.ini

[uwsgi]
daemonize = /var/log/uwsgi.log
logto = /var/log/uswgi/error.log
socket = :10222
chdir = /root/autoplatform
module = project.wsgi
master = true
process = 4
vacuum = true

注意:上面的 chdir = .../autoplatform 指的是项目的根目录,也即 manage.py 文件所在的目录

启动 nginx: nginx -s reload

启动 daphne: daphne -b 0.0.0.0 -p 8000 project.asgi:application -v2

注意:这里的 8000 端口就指的是 websocket 的通信端口,需要开启防火墙放过,跟前端 JS websocket 一致

启动 worker: python3 manage.py runworker channels service-detection

注意:这里的service-detection指的是project.routing.py里的 ChannelNameRouter 里面的某个 router key,参见 project.routing.py

  • 后台循环发送消息给前端

在以上代码的基础上,我们实现了前端某个 group 下的任何一个 client 发送消息给 server,server 都会推送最新的消息给所有该 group 下所有的 clients,这种说到底还是触发式的。假如我们需要不断的每隔一段时间就主动推一个新的消息给前端呢?这个时候我们就可以用上 BeatServer 了 (心跳,确认对方依然存活)。在以上代码的基础上,consumers.py 里面新增一个 BeatServer 类:

class BeatServer(SyncConsumer):
    def test_print(self, message):
        self.room_group_name = "status_2"
        print(self.room_group_name)
        async_to_sync(self.channel_layer.group_send)(self.room_group_name, {
            "type": "status_message",
            "message": "Update"
        })
        print(message)

project.routing.py的同目录下新增beatconfig.py:

from datetime import timedelta

BEAT_SCHEDULE = {
    'testing-print': {
        'type': 'test.print',
        'message': {'message': 'UUUU'},
        'schedule': timedelta(seconds=5)
    },
}

修改routing.py:

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter
import autoplatform.routing
from autoplatform import consumers

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AuthMiddlewareStack(
        URLRouter(
            autoplatform.routing.websocket_urlpatterns
        )
    ),
    "channel": ChannelNameRouter({
        "service-detection": consumers.ServiceConsumer,
        # 新增router
        "testing-print": consumers.BeatServer
    }),
})

然后,需要单独启动这个 BeatServer:python3 manage.py beatserver.

参考文献:

  1. Github beatserver
  2. Github channels
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册