app离线消息推送服务器,在线消息推送和离线消息推送(3)

首先介绍一下具体的功能设计。

整个系统由服务端、客户端组成。客户端包含前端(浏览器端、pyqt桌面端)和后台(用于添加要推送的消息)。

因为浏览器端不容易注意到实时提醒,所以我将实时提醒功能放到pyqt桌面端中实现(如托盘区闪烁等醒目的方式)。

浏览器端中只对消息进行拉取操作(刷新网页时),而不采取实时推的方式。

pyqt桌面端主要接收新消息提示即可,通知用户到网页端查看最新消息。(登录时主动拉取一次消息判断有无新消息,或者服务器主动推送所有新消息)

实时推送消息和主动拉取消息两个功能实际上是完全分离的,可以独立行使各自的职能。

不过下面我就不写浏览器端的代码了, 直接把相关的逻辑也放到pyqt里实现。

然后再来讲一下拉取服务端消息的逻辑。

每个用户拥有自己的消息库。当消息库为空时,一次性拉取服务端所有消息入用户库;当消息库内有消息时,每次拉取只拉取用户库内最新的消息发布时间之后的消息(将未入用户库的消息入库后再从用户库拉取)。

代码共5部分

1.鉴权服务器:用于服务和服务之间对用户提供的token做权限检查

2.消息数据服务器:用于读取和处理用户消息在数据库中的状态。

3.消息在线推送服务器:将管理员发布的消息在线实时推送给所有在线用户(若使用发布器的在线推送功能)

4.管理员消息发布器:分为在线推送和离线入库两个功能,可独立或组合使用。在线推送消息不会入消息库。

5.用户消息监听器:登陆后可接收管理员推送的在线消息,也可主动拉取用户对应消息库中的所有已读、未读消息,并标记消息的已读、未读状态。

下面是代码部分(为了简化代码逻辑,后面代码中的token即用户名。)

1.鉴权服务器

from flask import Flask, request, jsonify

app = Flask(__name__)

class Lib:

@staticmethod

def auth_admin_token(token):

return True

@staticmethod

def auth_user_token(token):

return True

@app.route('/admin', methods = ['POST'])

def admin_auth_token():

data = request.json

if Lib.auth_admin_token(data['token']):

return jsonify({'code': 200, 'msg': '鉴权成功'})

return jsonify({'code': 400, 'msg': '鉴权失败'})

@app.route('/user', methods = ['POST'])

def user_auth_token():

data = request.json

if Lib.auth_user_token(data['token']):

return jsonify({'code': 200, 'msg': '鉴权成功'})

return jsonify({'code':400, 'msg': '鉴权失败'})

if __name__ == '__main__':

app.run(host = '0.0.0.0', port = 5009)

2.消息数据服务器

# coding : utf-8

# author : ['Wang Suyin', ]

# data : 2020/2/21 14:14

# software : PyCharm

# python_version : '3.5.3 64bit'

# file : 2.消息数据服务器.py

"""

说明文档:

"""

import datetime

import requests

import json

from flask import Flask,request,jsonify

app=Flask(__name__)

# 虚拟数据

db = {

'message_table': [

{'id': 1, 'message': '消息1', 'publish_time': (2020, 1, 20, 3, 4, 5)},

{'id': 2, 'message': '消息2', 'publish_time': (2020, 1, 20, 3, 4, 6)},

{'id': 3, 'message': '消息3', 'publish_time': (2020, 1, 20, 3, 4, 7)},

],

'user_to_messages_table': [

{'username': 'tester', 'message_id': 1, 'read': True},

{'username': 'tester', 'message_id': 2, 'read': False},

],

}

class Utils:

@staticmethod

def auth_admin_token(token):

data = {'token': token}

r = requests.post(url = 'http://127.0.0.1:5009/admin', json = data)

r = json.loads(r.text)

if r['code'] == 200:

return True

return False

@staticmethod

def add_message(message):

t = datetime.datetime.now()

publish_time = (t.year, t.month, t.day, t.hour, t.minute, t.second)

db['message_table'].append({'id':len(db['message_table'])+1,'message': message, 'publish_time': publish_time})

@staticmethod

def get_user_messages(username):

print(db['message_table'])

# 拉取新消息入用户库

# 原先考虑的是拉取最新时间后的消息,目前看来求消息ID表的差集更简单些,也更保险些(防止遗漏)。当然,要根据具体场景优化,代码也要更换为数据库代码,这里仅为示意。

for message_id in list(

set([e['id'] for e in db['message_table']])

- set([e['message_id'] for e in [e_ for e_ in db['user_to_messages_table'] if e_['username']==username]])

):

db['user_to_messages_table'].append({'username': username, 'message_id': message_id, 'read': False})

# 在通知量少的情况下,可一次性返回所有内容。

# 在通知量多的情况下,可分页返回,也可只返回未读消息,已读消息由客户端本地保存。

# 简单起见,以下一次性返回所有内容

res = []

for e in db['user_to_messages_table']:

if e['username'] == username:

d = [e_ for e_ in db['message_table'] if e_['id'] == e['message_id']][0]

d.update({'read': e['read']})

res.append(d)

return res

@staticmethod

def mark_as_read(username, message_id,read_status):

#将消息标记为已读、未读状态,比较简单,就不具体谢写了

pass

@app.route('/add_message', methods=['POST'])

def add_message():

data = request.json

token = data['token']

r = requests.post(url='http://127.0.0.1:5009/admin',json={ 'token':token } )

r = json.loads(r.text)

if not r['code'] == 200:

return jsonify({'code':400,'msg':'鉴权失败'})

Utils.add_message(data['message'])

print('新的消息入库成功')

return jsonify({'code': 200, 'msg': '消息发布成功'})

@app.route('/get_user_messages', methods=['POST'])

def get_user_messages():

data = request.json

token = data['token']

if not Utils.auth_admin_token(token):

return jsonify({'code':400,'msg':'鉴权失败'})

username = token

data = Utils.get_user_messages(username)

return jsonify({'code': 200, 'msg': '拉取用户消息成功','data':data})

@app.route('/mark_as_read', methods=['POST'])

def mark_as_read():

data = request.json

token = data['token']

if not Utils.auth_admin_token(token):

return jsonify({'code':400,'msg':'鉴权失败'})

username = token

message_id = data['message_id']

read_status = data['read_status']

data = Utils.mark_as_read(username, message_id,read_status)

return jsonify({'code': 200, 'msg': '拉取用户消息成功','data':data})

if __name__ == '__main__':

app.run(host = '0.0.0.0', port = 5008)

3.消息在线推送服务器

# coding : utf-8

# author : ['Wang Suyin', ]

# data : 2020/2/20 16:48

# software : PyCharm

# python_version : '3.5.3 64bit'

# file : 3.消息在线推送服务器.py

"""

说明文档:

"""

import requests

import json

from flask import Flask

from flask_sockets import Sockets

app = Flask(__name__)

sockets = Sockets(app)

ws_pool = [] # 推送目标池

def auth_admin_token(token):

data = { 'token':token }

r = requests.post(url='http://127.0.0.1:5009/admin',json=data )

r = json.loads(r.text)

if r['code'] == 200:

return True

return False

def auth_user_token(token):

data = { 'token':token }

r = requests.post(url='http://127.0.0.1:5009/user',json=data )

r = json.loads(r.text)

if r['code'] == 200:

return True

return False

#新消息的插入可以通过ws也可以通过http

@sockets.route('/admin')

def admin_socket(ws):

print('admin接入')

r_data = ws.receive()

r_data = json.loads(r_data)

token = r_data['data']['token']

if not r_data['type']=='init' or not auth_admin_token(token):

ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))

ws.close()

return

ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))

while not ws.closed:

r_data = ws.receive()

if not r_data:

break

ws.send(json.dumps({'type':'message_r','code':200, 'msg':'发布成功'}))

data = json.loads(r_data)

if data['type'] == 'message':

print('将消息推送给{}'.format(ws_pool))

print(ws_pool)

#推送给在线用户

for e in ws_pool:

try:

e.send(json.dumps({'type':'message','data':{'message':data['data']['message']}}))

except:

ws_pool.remove(e)

try:

ws.close()

except:

pass

@sockets.route('/listener')

def listener_socket(ws):

print('listener接入')

r_data = ws.receive()

r_data = json.loads(r_data)

token = r_data['data']['token']

if not r_data['type']=='init' or not auth_user_token(token):

ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))

ws.close()

return

ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))

ws_pool.append(ws)

while not ws.closed:

r_data = ws.receive()

if not r_data:

break

#这里阻塞住就可以了,因为消息监听器只接收消息

try:

ws.close()

except:

pass

finally:

ws_pool.remove(ws)

if __name__ == '__main__':

from gevent import pywsgi

from geventwebsocket.handler import WebSocketHandler

from gevent import monkey

monkey.patch_all()

server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)

print('web server start ... ')

server.serve_forever()

4.管理员消息发布器

import json

import requests

import websocket

websocket.enableTrace(True)

# 这里就不写界面了,要推送的消息一并写在on_message里

class OnlinePublisher:

_message = ''

def __init__(self, url = 'ws://127.0.0.1:5003/admin'):

self.__ws = ws = websocket.WebSocketApp(url)

ws.on_open = self.on_open

ws.on_message = self.on_message

ws.on_error = self.on_error

ws.on_close = self.on_close

def publish(self, message):

OnlinePublisher._message = message

self.__ws.run_forever()

@staticmethod

def on_open(ws):

token = 'admin'

data = json.dumps({'type': 'init', 'data': {'token': token}})

ws.send(data)

@staticmethod

def on_message(ws, r_data):

print('接收到消息:{}'.format(r_data))

data = json.loads(r_data)

if data['type'] == 'init_r':

if data['code'] != 200:

ws.close()

raise Exception('鉴权失败')

ws.send(json.dumps({'type': 'message', 'data': {'message': OnlinePublisher._message}}))

elif data['type'] == 'message_r':

if data['code'] == 200:

print('发布成功')

else:

raise Exception('发布失败')

ws.close()

@staticmethod

def on_error(ws):

print('连接异常')

@staticmethod

def on_close(ws):

print('连接关闭')

def online_publish(message):

OnlinePublisher().publish(message)

def offline_publish(message):

data = {'token': 'admin', 'message': message}

r = requests.post(url = 'http://127.0.0.1:5008/add_message', json = data)

print(r.text)

r = json.loads(r.text)

if r['code'] == 400:

raise Exception('鉴权失败')

print('消息入库成功')

if __name__ == '__main__':

message = '测试消息,这是一条公告'

online_publish(message)

offline_publish(message)

5.用户消息监听器

import json

import sys

import datetime

import threading

import requests

from PyQt5.QtWidgets import *

from PyQt5.QtGui import *

from PyQt5.QtCore import *

import websocket

websocket.enableTrace(True)

class ActionSet:

@staticmethod

def on_open(ws):

token = 'user'

data = json.dumps({'type': 'init', 'data': {'token': token}})

ws.send(data)

@staticmethod

def on_message(ws, r_data):

print('接收到消息:{}'.format(r_data))

data = json.loads(r_data)

if data['type'] == 'init_r':

if data['code'] != 200:

ws.close()

print('鉴权失败')

return

print('鉴权成功')

if data['type'] == 'message':

print(data['data']['message'])

MainWindow._instance.new_message_got.emit(data['data']['message'])

@staticmethod

def on_error(ws):

print('连接异常')

@staticmethod

def on_close(ws):

print('连接关闭')

class MainWindow(QWidget):

_instance = None

new_message_got = pyqtSignal(str)

def __init__(self, parent = None):

super().__init__(parent)

MainWindow._instance = self

self.setWindowTitle('消息监听器')

self.resize(800, 600)

self.__btn_online_connect = QPushButton('连接')

self.__lw_online_message = QListWidget()

self.__le_username = QLineEdit()

self.__le_username.setPlaceholderText('要拉取消息的用户名')

self.__btn_pull_all_messages = QPushButton('拉取所有消息/刷新')

self.__lw_all_messages = QListWidget()

main_layout = QVBoxLayout()

main_layout.setSpacing(0)

main_layout.setContentsMargins(0, 0, 0, 0)

self.setLayout(main_layout)

main_layout.addWidget(self.__btn_online_connect)

main_layout.addWidget(self.__lw_online_message)

main_layout.addWidget(self.__le_username)

main_layout.addWidget(self.__btn_pull_all_messages)

main_layout.addWidget(self.__lw_all_messages)

self.__btn_online_connect.clicked.connect(self.__on_btn_online_connect_clicked)

self.new_message_got.connect(self.__show_new_message)

self.__btn_pull_all_messages.clicked.connect(self.__on_btn_pull_all_messages_clicked)

def __on_btn_online_connect_clicked(self):

url = 'ws://127.0.0.1:5003/listener'

ws = websocket.WebSocketApp(url)

ws.on_open = ActionSet.on_open

ws.on_message = ActionSet.on_message

ws.on_error = ActionSet.on_error

ws.on_close = ActionSet.on_close

wst = threading.Thread(target = ws.run_forever)

wst.setDaemon(True)

wst.start()

def __on_btn_pull_all_messages_clicked(self):

data = {'token': self.__le_username.text()}

r = requests.post(url = 'http://127.0.0.1:5008/get_user_messages', json = data)

r = json.loads(r.text)

if r['code'] == 400:

raise Exception('鉴权失败')

print('消息拉取成功')

message_datas = sorted(r['data'][:],key = lambda x:-x['id'])

self.__lw_all_messages.clear()

for d in message_datas:

self.__lw_all_messages.addItem('【{}】{}'.format({True:'已读',False:'未读'}[d['read']],d['message'] ) )

def __show_new_message(self, text):

time_ = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

text = '{} {}'.format(time_, text)

self.__lw_online_message.insertItem(0, text)

if __name__ == '__main__':

app = QApplication(sys.argv)

w = MainWindow()

w.show()

app.exec_()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/488664.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于深层卷积网络的手写数字识别 minist_「Tensorflow」基于CNN的数字OCR识别

导读 对于人类来说,识别手写的数字是一件非常容易的事情。我们甚至不用思考,就可以看出下面的数字分别是1,2,3。那机器如何来识别数字?本期将使用Tensorflow搭建卷积神经网络,进行手写数字的识别。代码可关…

2019年5G创新深度研究报告

来源:中信建投从历史上看,每一轮科技产业创新周期均主要由通信代际升级驱动,历时 5-8 年。我们认为 2017-2019 年为 4G 时代的稳定成熟期,而进入 2020 年,运营商加速投入 5G 网络建设,科技软硬件有望在运营…

鹰眼系统原理_山东首家露天焚烧鹰眼监控系统在我镇投入使用

露天冒烟着火,不用人员到现场,电子围栏就会锁定目标,自动报警,提醒监管人员立即现场处置。日前,莱西市院上镇新安装建设的污染源鹰眼监控系统投入使用,实现了环境监控全方位、自动化。据了解,这…

python requests是什么_python requests 连接池

本文从WordPress迁移而来, 查看全部WordPress迁移文章 背景:要从内部的一个web api拉取数据,要请求大概15万次左右(15万个ID,每个ID请求一次) 特点:一个host(也可以以IP直接请求)&am…

7nmarm微架构鲲鹏服务器芯片,中国电信服务器集采:同方鲲鹏服务器拿下6000万元份额...

原标题:中国电信服务器集采:同方鲲鹏服务器拿下6000万元份额(全球TMT2021年4月9日讯)日前,中国电信(2021年)服务器集采项目招标的GPU型服务器中标候选人公示发布。其中,鲲鹏、海光等国产CPU服务器占比进一步提升,标志着…

机器人的工作原理,这是我见过最详细的解析!

来源:网络很多人一听到“机器人”这三个字脑中就会浮现“外形酷炫”、“功能强大”、“高端”等这些词,认为机器人就和科幻电影里的“终结者”一样高端炫酷。其实不然,在本文中,我们将探讨机器人学的基本概念,并了解机…

linux mint 图标主题_如何在 Linux Mint 中更换主题

一直以来,使用 Cinnamon 桌面环境的 Linux Mint 都是一种卓越的体验。这也是为何我喜爱 Linux Mint的主要原因之一。-- Its Foss(作者)一直以来,使用 Cinnamon 桌面环境的 Linux Mint 都是一种卓越的体验。这也是为何我喜爱 Linux…

服务器物品展示框刷物品,我的世界1period;11period;2展示框刷物品bug | 手游网游页游攻略大全...

发布时间:2017-09-25我的世界惊现全新无限刷物品bug 服主大大都要注意了.那今天给大家分享一个玩家无意间发现的新的无限刷物品bug,而且还是在服务器中哦!那感兴趣的玩家不妨进来看看哦! 在一个rpg服务器玩 开小号召唤boss的时候发现的. ...标签:我的世界…

Nature:揭示人大脑类器官为何缺乏正常人脑特有的细胞亚型和复杂回路

来源:生物谷作为在实验室中通常利用人类干细胞培育出的大脑样组织三维球体,大脑类器官被吹捧为有潜力让科学家们在受控的实验室条件下研究大脑回路的形成。关于大脑类器官的讨论非常热闹,一些科学家认为它们将使得快速开发针对破坏性的脑部疾…

anaconda 怎么安装xlrd_Pyinstaller打包,文件太大了怎么办?

这是一个很长的故事,嫌长的直接看最后的结论事情经过上周接了个需求,写了个小工具给客户,他要求打包成exe文件,这当然不是什么难事。因为除了写Python的,绝大多数人电脑里都没有Python编译器,所以打包成exe…

2017.4.07 js 中的function 实现的方式

函数分为FD (函数定义),FE(函数表达式) ,函数构造器得到的函数(1) FD 的栗子:function getTaste(){ .......}解析器遇到上面的function关键字,会解析上面的代码为函数定义的情况,凡…

android checkbox 选中事件_使用Vue3.0新特性造轮子 WidgetUI3.0 (Checkbox复选框组件)

"title"标题示例代码:data [ { title: 新日小卫士二代, }, { title: 车子质量不合格, }, { title: 我买的骑士1号仪表台进水怎么回事?, }, { title: 风雅欧妮大灯高低调节, }]"title"标题和"desc"描…

服务器装系统用哪个好,服务器系统重装用哪个系统

服务器系统重装用哪个系统 内容精选换一换华为云帮助中心,为用户提供产品简介、价格说明、购买指南、用户指南、API参考、最佳实践、常见问题、视频帮助等技术文档,帮助您快速上手使用华为云服务。重装裸金属服务器的操作系统。快速发放裸金属服务器支持…

2020图机器学习GNN的四大研究趋势

来源:专知【导读】以图神经网络为代表的图机器学习在近两年成为研究热点之一。近日,图机器学习专家 Sergei Ivanov 为我们解读了他总结出来的 2020 年图机器学习的四大热门趋势,包括图神经网络的理论理解、应用普及、应用、图嵌入框架&#x…

c++ 将文件内容输出到word上_原来PDF转Word可以这么简单,只需要一个键!办公起来真方便...

在职场办公中,我们经常接触到PDF文件,如果让你把PDF转换成Word,你还在束手无策吗?今天就来教你3种方法,如何将PDF转换成Word。01.复制粘贴法相信很多新手都只会【CtrlC/V】,虽然PDF文件不能直接被修改&…

python的类和实例_Python使用类和实例

我们可以使用类来模拟现实世界中的很多情景。类编写好后,你的大部分时间都将花在使用根据类创建的实例上。你需要执行的一个重要任务是修改实例的属性。你可以直接修改实例的属性,也可以编写方法以特定的方式进行修改。 Car类 下面来编写一个表示汽车的类…

iApp最新版无服务器多功能软件库源码

无需服务器的多功能软件库源码分享,仅需添加一个后台应用和一个文档即可 使用教程如下: 在浏览器中打开理想后台地址:http://apps.xiaofei.run/user/ 如果没有账号,请注册一个免费账号。 登录账号后,添加一个后台应…

二维数组求和 团队开发

题目:返回一个二维整数数组中最大联通子数组的和。 要求: 输入一个二维整形数组,数组里有正数也有负数。 求所有子数组的和的最大值。要求时间复杂度为O(n)。 开发方式:团队开发 主要思路:二维连通数组求最大子数组,我…

rpc协议微服务器,RPC协议及实现方式(分布式微服务治理的核心)

分布式微服务治理的核心在于: 微服务和分布式(微服务框架)微服务的最优技术实现目前是: SpringBoot(RPC 框架)分布式的最优技术实现目前是: Thrift,Motan,Dubbo,Spring Cloud(Netflix OSS),Finagle,gRPCRPC 是什么RPC 的全称是 Remote Procedure Call ,是一种进程间…

2019年度全球工程前沿研究报告

来源:JAS自动化学报英文版1. 工程研究前沿1.1 Top 10 工程研究前沿发展态势信息与电子工程领域 Top 10 工程研究前沿涉及电子科学与技术、光学工程与技术、仪器科学与技术、信息与通信工程、计算机科学与技术、控制科学与技术等学科方向。其中,“面向光互…