Django Channels实现Zabbix实时告警到页面

什么是WebSocket

websocket是HTML5开始提供的一种新协议,用于浏览器和服务器之间实现全双工通讯的技术。本质上是基于tcp协议,先通过HTTP/HTTPS协议发起一条特殊的http请求进行握手后,创建一个用于双向数据交换的tcp连接,此后服务端与客户端通过此连接进行实时通信。在websocket之前实现全双工通讯一般使用轮训、SSE(Server-Sent Event,服务端推送事件)、Comet技术

HTTP与WebSocket的区别

image

  • 由上面的示意图可知,在传统的http1.0,request和response是一对一的,每次都要发送header信息
  • http1.1 默认开启了keeplive也只是复用同一个tcp连接,但是服务器和客户端还要大量交换HTTP header,信息交换效率很低。
  • WebSocket是一种双向通信协议。在建立连接后,WebSocket服务器端和客户端都能主动向对方发送或接收数据,就像Socket一样。从而更好的节省服务器资源和带宽并达到实时通讯的目的
  • WebSocket需要像TCP一样,先建立连接,连接成功后才能相互通信

客户端通过WebSocket与服务端建立通信过程

  1. 在客户端,new WebSocket实例化一个新的WebSocket客户端对象,请求类似 ws://yourdomain:port/path 的服务端WebSocket URL,客户端WebSocket对象会自动解析并识别为WebSocket请求,并连接服务端端口,执行双方握手过程,客户端发送数据格式类似:

    1
    2
    3
    4
    5
    6
    7
    GET /ws/alert/ HTTP/1.1
    Host: 127.0.0.1:8000
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==
    Origin: http://127.0.0.1:8000
    Sec-WebSocket-Version: 13
  2. 可以看到,客户端发起的WebSocket连接报文类似传统HTTP报文,Upgrade:websocket参数值表明这是WebSocket类型请求,Sec-WebSocket-Key是WebSocket客户端发送的一个 base64编码的密文,要求服务端必须返回一个对应加密的Sec-WebSocket-Accept应答,否则客户端会抛出Error during WebSocket handshake错误,并关闭连接。服务端收到报文后返回的数据格式类似:

    1
    2
    3
    4
    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: K7DJLdLooIwIG/MOpvWFB3y3FE8=

Sec-WebSocket-Accept的值是服务端采用与客户端一致的密钥计算出来后返回客户端的,HTTP/1.1 101 Switching Protocols表示服务端接受WebSocket协议的客户端连接,经过这样的请求-响应处理后,两端的WebSocket连接握手成功, 后续就可以进行TCP通讯了

image

注释: WebSocket标识符是ws(如果加密,则是wss),如上图所示

WebSocket服务

Django Channel

WSGI/ASGI

WSGI
大家都知道WSGI,即Web Server Gateway Interface,是服务器和客户端交互的接口规范,符合这种借口的application可以在所有符合该接口的server上运行,解耦了server和application;web组件被分成三类:client、server、middleware

image
image

如上图所示

  • Server/Gateway:处理HTTP协议,接受用户HTTP请求,调用application处理逻辑,将response返回给client;比如Apache、Nginx
  • Application:专注业务逻辑的python 应用或者框架,如Django;根据WSGI协议规范,Applicaiton需要定义http://wsgi.tutorial.codepoint.net/application-interface
  • Middleware:位于Server/Gateway 和 Application/Framework 之间,对 Server/Gateway 来说,它相当于 Application/Framework ;对 Application/Framework 来说,它相当于 Server/Gateway。每个 middleware 实现不同的功能,我们通常根据需求选择相应的 middleware 并组合起来,实现所需的功能。比如,可在 middleware 中实现以下功能:
    1. 根据 url 把用户请求调度到不同的 application 中
    2. 负载均衡,转发用户请求
    3. 限制请求速率,设置白名单
      WSGI的middleware体现 unix 的哲学之一:do one thing and do it well

ASGI
由于WSGI协议支持HTTP,ASGI(Asynchronous Server Gateway Interface)在此基础上应运而生,对WSGI协议进行兼容和扩展,能够处理多种通用协议如HTTP、HTTP2、WebSocket,允许这些协议能通过网络或本地socket进行传输,以及让不同的协议被分配到不同的进程中
image
image

ASGI由三个不同的组件组成:协议服务、频道层(Channnel Layer)、应用层;其中Channel Layer是最重要的部分,同时对协议服务和应用提供接口;

  • 频道和消息: ASGI规定所有通信都要通过在频道里发送消息进行,消息是一个dict,为了保证可序列化,只允许以下类型数据
    string/Unicode/int(非long)/list/dict(Key是Unicode)/boolean/None
    频道是一个先进先出队列,队列中的消息最多发送给一个消费者;频道中的消息超过设定时间会被清理,消息大小最大限定为1MB,超过需要分块
  • 群组: 频道中消息只能被传送一次,不能广播;如果向任一组用户发送消息,就要用到群组
Channels

大概了解ASGI规范之后,看下django基于ASGI协议实现HTTP/HTTP2/WebSocket的模块Channels,安装好channels后,django将有原来的request-response模式,转换成worker工作模式;并没有运行单独的wsgi进程,而是分成了三层:

  • interface Server: 负责Django和Client通信,同时适配WSGI和WebSocket Server
  • Channel Layer: 可插拔的Python代码和数据存储,如Redis、或者内存,用于消息的传输
  • Workers: 监听频道,消息抵达时运行消费者代码

下面用例子来看下如何使用Channels: 实现Zabbix报警实时传送到客户端
描述:
image

  • Trigger触发时,根据Action设置通过脚本报警,并将报警信息发布到Redis的ALARM频道
  • Django Commands alert 订阅Redis的ALARM频道
  • 调用channels的send方法,通过websocket实时推送到Client

目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
monitor
├── monitor
│   ├── celery.py
│   ├── consumers.py
│   ├── __init__.py
│   ├── routing.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── commands
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── management
│   │   ├── commands
│   │   │   ├── alert.py
│   │   │   ├── __init__.py
│   │   │   └── __pycache__
│   │   ├── __init__.py
│   │   └── __pycache__
│   ├── migrations
│   │   ├── __init__.py
│   │   └── __pycache__
│   ├── models.py
│   ├── __pycache__
│   ├── tests.py
│   └── views.py
├── static
├── templates
│   ├── base
| |—— base.html
├── README.md
├── requirements.txt

安装配置Channels
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
pip install channels asgi_redis
settings.py添加app和设置CHANNEL_LAYERS
#commands是后面定义Django命令的app
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'channels',
'commands'
]
#Redis信息
REDIS_OPTIONS = {
'HOST': '127.0.0.1',
'PORT': 6379,
'PASSWD': 'geekwolf',
'DB': 0
}
#可以使用内存存储Channels消息
#CHANNEL_LAYERS = {
# "default": {
# "BACKEND": "asgiref.inmemory.ChannelLayer",
# "ROUTING": "channels_example.routing.channel_routing",
# },
#}
#使用Redis作为消息存储,需安装asgi_redis
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'asgi_redis.RedisChannelLayer',
'CONFIG': {
'hosts': ['redis://:{0}@{1}:{2}/{3}'.format(REDIS_OPTIONS['PASSWD'], REDIS_OPTIONS['HOST'], REDIS_OPTIONS['PORT'], 1)]
},
'ROUTING': 'plonvol.routing.channel_routing'
}
}
#Redis频道和Channels群组名
GROUP_NAME = 'alarm'
添加路由(routing.py)
1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
from channels.routing import route
from .consumers import ws_connect, ws_disconnect, ws_receive
channel_routing = [
route('websocket.connect', ws_connect),
route('websocket.disconnect', ws_disconnect),
]
添加consumers文件(类似view)(consumers.py)
1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-
from channels import Group
from channels.handler import AsgiHandler
from django.conf import settings
def ws_connect(message):
message.reply_channel.send({'accept': True})
Group(settings.GROUP_NAME).add(message.reply_channel)
def ws_disconnect(message):
Group(settings.GROUP_NAME).discard(message.reply_channel)
订阅Redis报警消息脚本(commands/management/commands/alert.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# -*- coding: utf-8 -*-
import json
import logging
from channels import Group
from django.core.management import BaseCommand
from django.conf import settings
import redis
logger = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Command to start zabbix alert worker from command line.
"""
help = 'Subscribe the zabbix alerts channel'
def handle(self, *args, **options):
rc = redis.Redis(host=settings.REDIS_OPTIONS['HOST'],
password=settings.REDIS_OPTIONS['PASSWD'],
port=settings.REDIS_OPTIONS['PORT'],
db=settings.REDIS_OPTIONS['DB'])
rc.delete(settings.GROUP_NAME)
pubsub = rc.pubsub()
pubsub.subscribe(settings.GROUP_NAME)
for item in pubsub.listen():
if item['type'] == 'message':
Group(settings.GROUP_NAME).send({'text': bytes.decode(item['data'])})
logger.debug('send a message %s ' % item)
前端页面base.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<!-- 报警声音 -->
<audio id="notify"><source src="/static/notify.ogg" type="audio/ogg">></audio>
<script type="application/javascript">
var ws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
var ws = new WebSocket(ws_scheme + '://' + window.location.host + window.location.pathname);
console.log(ws);
ws.onmessage = function (message) {
var data = JSON.parse(message.data);
if(data['当前状态'] == 'Problem')
{
var subject = '<br>故障!,服务器:' + data['告警主机'] + '发生:' + data['告警信息'] + '故障!<br>';
}
else{
var subject = '<br>恢复!,服务器:' + data['告警主机'] + '发生:' + data['告警信息'] + '已经恢复!<br>';
}
content = new Array();
$.each(data,function(k,v){
content.push("<b>" + k + ':</b> ' + v)
})
var data = subject + content.join("<br>")
$('#notify')[0].play();
notify('warning','glyphicon glyphicon-danger-sign',data);
}
</script>
测试消息,用于发布消息到Redis
1
2
3
4
5
6
7
8
import redis
import json
rc = redis.Redis(host='127.0.0.1', password='geekwolf', port=6379, db=0)
msg = {"告警主机": "web-server-node1", "告警地址": "192.168.1133.11", "告警时间": "2017-11-11 05:05:22", "告警等级": "严重",
"告警信息": "Web端口80监控", "问题详情": "80端口连接失败", "当前状态": "Problem", "事件ID": "12345"}
rc.publish('alarm', json.dumps(msg))
运行服务,测试
1
2
3
4
启动项目(http/websocket):
python manage.py runserver 0.0.0.0:8000
启动监听报警消息进程:
python manage.py alert

访问http://192.168.1.1:8000,运行test.py脚本
image

参考资料

坚持原创分享,您的支持将鼓励我继续创作