【最后203篇系列】004 -Smarklink

说明

这个用来替代nginx。

最初是希望用nginx进行故障检测和负载均衡,花了很多时间,大致的结论是:nginx可以实现,但是是在商业版里。非得要找替代肯定可以搞出来,但是太麻烦了(即使是nginx本身的配置也很烦)。更重要的是,我突然意识到其实我对这方面的需求大幅提升了,nginx无法实现我最核心的功能。

于是我打算自己做一个网关。以下是两者的差别:
在这里插入图片描述

我并不是要做一个新的产品来替代nginx,只是需要对我的应用服务进行非常精细的控制,然后我问了下大模型,是否可行:

使用 Tornado 构建自定义应用网关 是一个非常有挑战性但也非常灵活的选择。通过 Tornado 的异步特性和强大的编程能力,你可以实现非常精细的控制,满足特定的业务需求。以下是实现这一目标的思路和建议:


1. Tornado 作为应用网关的核心功能

你可以利用 Tornado 实现以下网关功能:

反向代理
  • 使用 Tornado 的 AsyncHTTPClienthttpclient 模块,将客户端请求转发到后端服务。
  • 示例:
    from tornado.httpclient import AsyncHTTPClientclass ProxyHandler(tornado.web.RequestHandler):async def get(self):http_client = AsyncHTTPClient()response = await http_client.fetch("http://backend-service" + self.request.uri)self.write(response.body)
    
负载均衡
  • 实现自定义的负载均衡算法(如轮询、加权轮询、最小连接数等)。
  • 示例:
    backend_servers = ["http://backend1", "http://backend2", "http://backend3"]
    current_server = 0class LoadBalancerHandler(tornado.web.RequestHandler):async def get(self):global current_serverbackend_url = backend_servers[current_server]current_server = (current_server + 1) % len(backend_servers)http_client = AsyncHTTPClient()response = await http_client.fetch(backend_url + self.request.uri)self.write(response.body)
    
动态路由
  • 根据请求的路径、头部或其他条件,将请求路由到不同的后端服务。
  • 示例:
    class RouterHandler(tornado.web.RequestHandler):async def get(self):if self.request.path.startswith("/api/v1"):backend_url = "http://api-v1-service"elif self.request.path.startswith("/api/v2"):backend_url = "http://api-v2-service"else:backend_url = "http://default-service"http_client = AsyncHTTPClient()response = await http_client.fetch(backend_url + self.request.uri)self.write(response.body)
    
限流和限速
  • 使用 Tornado 的异步特性实现请求速率限制。
  • 示例:
    from tornado.ioloop import IOLoop
    from tornado.web import RequestHandler
    from tornado.queues import Queuerequest_queue = Queue(maxsize=100)  # 限制并发请求数class RateLimitHandler(RequestHandler):async def get(self):await request_queue.put(None)try:# 处理请求self.write("Hello, world!")finally:request_queue.get_nowait()
    
身份验证和授权
  • 在网关层实现统一的身份验证(如 JWT 验证、OAuth 等)。
  • 示例:
    import jwtclass AuthHandler(tornado.web.RequestHandler):def prepare(self):token = self.request.headers.get("Authorization")if not token:self.set_status(401)self.finish("Unauthorized")try:payload = jwt.decode(token, "secret_key", algorithms=["HS256"])self.current_user = payload["user"]except jwt.InvalidTokenError:self.set_status(401)self.finish("Invalid token")
    
日志和监控
  • 在网关层记录请求日志,并集成监控工具(如 Prometheus)。
  • 示例:
    class LoggingHandler(tornado.web.RequestHandler):def on_finish(self):print(f"Request: {self.request.method} {self.request.uri} - Status: {self.get_status()}")
    

2. Tornado 网关的优势

  • 高度定制化:你可以完全控制网关的行为,实现特定的业务逻辑。
  • 灵活性:Tornado 的异步特性使其能够高效处理高并发请求。
  • 与 Python 生态无缝集成:可以轻松集成其他 Python 库(如数据库驱动、消息队列等)。

3. Tornado 网关的挑战

  • 开发成本高:需要从头实现许多功能(如负载均衡、健康检查、限流等)。
  • 性能瓶颈:Tornado 是 Python 编写的,性能可能不如 Nginx 这样的 C 语言实现。
  • 维护复杂性:自定义网关需要更多的测试和维护工作。

4. 建议的架构

如果你决定使用 Tornado 构建网关,建议采用以下架构:

  1. Nginx 作为前置网关

    • 处理 SSL/TLS 终止、静态文件缓存、DDoS 防护等。
    • 将动态请求转发到 Tornado 网关。
  2. Tornado 作为应用网关

    • 实现精细的流量控制、动态路由、身份验证等功能。
    • 将请求转发到后端的微服务或应用服务器。
  3. 监控和日志

    • 集成 Prometheus、Grafana 等工具,实时监控网关性能。
    • 使用 ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志分析。

5. 示例代码

以下是一个简单的 Tornado 网关示例:

import tornado.ioloop
import tornado.web
from tornado.httpclient import AsyncHTTPClientclass ProxyHandler(tornado.web.RequestHandler):async def get(self):backend_url = "http://backend-service" + self.request.urihttp_client = AsyncHTTPClient()response = await http_client.fetch(backend_url)self.write(response.body)def make_app():return tornado.web.Application([(r"/.*", ProxyHandler),])if __name__ == "__main__":app = make_app()app.listen(8888)tornado.ioloop.IOLoop.current().start()

总结

使用 Tornado 构建自定义应用网关是可行的,尤其适合需要高度定制化的场景。你可以实现反向代理、负载均衡、动态路由、限流、身份验证等功能,但需要注意开发成本和性能瓶颈。如果可能,建议将 Nginx 作为前置网关,Tornado 作为应用网关,结合两者的优势,构建高性能、高可用的系统。

看看性能是不是ok:
在这里插入图片描述

在这里插入图片描述

所以,总体上tornado在性能上可能会比nginx低一个量级:

  • 1 简单的基础服务可以继续走nginx,或者直连,这部分假设无问题。
  • 2 复杂的、核心的服务走smartlink。这部分的处理本身就非常“昂贵”,比如用大模型处理数据,天然的TPS就比较低,smartlink额外的开销比起来不算啥,是可行的(或者说现在的计算资源太多了,费点没啥)。即使是较低配的生产机也应该有8个核,花1个核做网关在我看来是值得的。

smartlink 可以带来什么:

  • 1 权限控制。过去我一般不去设置权限,这样开发和使用比较方便,但这一步肯定是要做的。通过smartlink,仅开放一个对外端口,访问其他服务时必须提供对应口令,在smartlink完成鉴权。
  • 2 流量及质量。现在有一类服务要求比较高,按照固定周期统计可能会有些滞后。通过smartlink(结合redis), 可以实时的对流量进行控制,获取请求的响应头,将成功(2xx)、失败(5xx)等不同类型的错误进行实时统计和通知。另外可以统计实时的影响速度,由别的算法应用读取,来进行分发优化。
  • 3 优化与强化。这个问题是最为核心且重要的,当我们花费成千上万的人民币在进行数据的处理时,我们知道做的不够好。通常来说,算法很难在第一版达到完美。然而当服务进入生产状态时,我们又很难去停下来再次改进;即使我们离线改好了新版算法,又担心上线时出bug。这样的最终结果是:不出问题不改进。smartlink在增加cpu开销时,也给了足够强大的控制逻辑,可以解决这些问题。 这里不讨论细节,只看机制上的可行性。当请求到达时,smarklink会按照实时加载的策略(通过redis)决定请求是否可以多发。比如除了将请求实时转向对应的稳定版服务,也发到kafka。收到请求的处理结果后,除了返回用户,也发一份到kafka。这个过程会增加2次写kafka的操作,对于秒级级的请求来说是可以接受的(假设请求30秒超时)。数据到了kafka之后,可以有多个消费者准实时的执行新版本服务的请求,然后和当前服务的结果一并进行效果评估与比对。首先会排除的问题是bug引起的请求失败,然后新版本服务可以准实时的进行评估、打标(RLHF)。评估结束后,可以采用手动/自动的方法,修改redis中的数据,从而使请求平滑切换到新的服务。这个服务是经过了效果评估和上线测试的,不会有低级错误。

内容

本次的目标先实现几个基本功能

功能:

  • 1 转发
  • 2 health-check
  • 3 负载均衡

1 测试样例

  • 1 转发

在一台主机上,启动一个微服务(实体识别),通过smartlink转发服务到此位置

  • 2 health-check

当请求报连接错误时,smartlink会将对应服务记为不可用;当服务恢复时,会将其标记为可用。

  • 3 负载均衡(least time)

再启动一个服务,加入并在服务中进行时间扰动。要求smartlink在将服务转发时会依据其响应时间动态分配。

2 实验

我发现今天可能写不完全部了,内容有点多,我先把完成的有部分整理上。

2.1 基础配置部分 base_config.py

这里先手动配置了一个测试服务。假设每个服务的可用资源是一个字典, 并且认为一定有一个默认的服务版本(base)

...
base_config.server_dict = {}
base_config.server_dict['test_server'] = {}
base_config.server_dict['test_server']['base'] = "http://IP:PORT/get_lng_lat/" 

2.2 数据模型 data_model.py

约定(建议性质)了模型的输入和输出。本来是想强制的,但那样难度太高了,而且作为一个网关,目前的主要任务还是能够进行有效转发。未来可以尽量往这个标准靠。

# 服务输入
class StandardInput(BaseModel):server_name : str  # 要请求的服务名params : dict = {} #  请求参数data : list = []# 请求数据 - 一般以lod的方式输入# 服务输出
class StandardOutput(BaseModel):server_name: str = 'default'status : bool  = Truemsg : str  = 'ok'data : list = [] # 返回数据 - 以lod的方式返回duration : int = 0 # ms 

2.3 服务 server.py

2.3.1 基本部分导入部分

import tornado.httpserver  # http服务器
import tornado.ioloop  # ?
import tornado.options  # 指定服务端口和路径解析
import tornado.web  # web模块
from tornado.options import define, options
import os.path  # 获取和生成template文件路径
import tornado.httpclient  
from Basefuncs import * 
from base_config import base_config
from data_model import * 
# 配置日志
logger = get_base_logger()import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):def default(self, obj):if isinstance(obj, np.integer):return int(obj)elif isinstance(obj, np.floating):return float(obj)elif isinstance(obj, np.ndarray):return obj.tolist()if isinstance(obj, datetime):return obj.__str__()if isinstance(obj, dd.timedelta):return obj.__str__()else:return super(MyEncoder, self).default(obj)

2.3.2 基础转发

# 基础转发处理器
class BaseForwardHandler(tornado.web.RequestHandler):def set_default_headers(self):self.set_header("Access-Control-Allow-Origin", "*")self.set_header("Access-Control-Allow-Headers", "Content-Type")self.set_header("Access-Control-Allow-Methods", "POST, OPTIONS")def options(self):self.set_status(204)self.finish()# 被转发的请求async def forward_request(self, target_url, body=None, headers=None):# 获取原始请求的 body 和 headers# 如果没有传递 body 和 headers,则使用原始请求的 body 和 headersif body is None:body = self.request.bodyif headers is None:headers = self.request.headers# 使用 Tornado 的异步 HTTP 客户端转发请求http_client = tornado.httpclient.AsyncHTTPClient()try:logger.info(f"Forwarding request to {target_url}")response = await http_client.fetch(target_url,method="POST",headers=headers,body=body)self.write(response.body)except tornado.httpclient.HTTPError as e:logger.error(f"HTTP Error: {e.code} - {str(e)}")self.set_status(e.code)self.write(json.dumps({"error": str(e)}))# 触发告警通知await self.trigger_alert(e)except Exception as e:logger.error(f"Unexpected error: {str(e)}")self.set_status(500)self.write(json.dumps({"error": "Internal Server Error"}))# 触发告警通知await self.trigger_alert(e)async def trigger_alert(self, error):"""触发告警通知的逻辑"""alert_message = f"Service Error: {str(error)}"logger.error(f"Sending alert: {alert_message}")# 示例:发送告警到内部系统(如邮件、Slack、Webhook 等)try:# 假设我们有一个发送告警的函数await self.send_alert_to_internal_system(alert_message)except Exception as e:logger.error(f"Failed to send alert: {str(e)}")async def send_alert_to_internal_system(self, message):"""发送告警到内部系统的逻辑"""# 示例:使用 HTTP 客户端发送告警到 Webhookhttp_client = tornado.httpclient.AsyncHTTPClient()alert_url = "http://internal-alert-system/endpoint"body = json.dumps({"message": message})try:await http_client.fetch(alert_url,method="POST",headers={"Content-Type": "application/json"},body=body)logger.info("Alert sent successfully")except Exception as e:logger.error(f"Failed to send alert: {str(e)}")raise e

2.3.3 应用的转发逻辑

import time 
class AIServer(BaseForwardHandler):print('AIServer Working ', get_time_str1())async def post(self):standard_output = StandardOutput()print('解析请求体')# 1. 解析请求体try:request_data = json.loads(self.request.body)print("Parsed request data:", request_data)except json.JSONDecodeError:self.set_status(400)standard_output.status = False standard_output.msg  = "Invalid JSON format"self.write(json.dumps(standard_output.dict()))return# 2. 验证请求格式try:standard_input = StandardInput(**request_data)except (TypeError, ValueError) as e:# 如果 request_data 不符合 StandardInput 的要求,捕获 TypeError 或 ValueErrorself.set_status(400)standard_output.status = False standard_output.msg  = f"Invalid input data: {str(e)}"self.write(json.dumps(standard_output.dict()))return# 3.转发请求target_server_dict =  base_config.server_dict.get(standard_input.server_name)if target_server_dict is None:self.set_status(400)standard_output.status = False standard_output.msg  = f"Invalid input data: No Such Service {standard_input.server_name}"self.write(json.dumps(standard_output.dict()))return# 这里控制版本await self.forward_request(target_server_dict['base'])

然后进行调用

请求端:

import httpx
data_dict =   {'server_name':'test_server', 'address': '上海九六广场'}
resp = httpx.post('http://localhost:9001/ai_server/' ,json = data_dict)
In [31]: resp.json()
Out[31]:
{'name': 'get_lng_lat','status': True,'msg': 'ok','duration': 86,'data': {'status': 0,'lng': 121.5312514401146,'lat': 31.23340608690243,'precise': 1,'confidence': 75,'comprehension': 100,'level': '购物'}}

服务端:

...
解析请求体
Parsed request data: {'server_name': 'test_server', 'address': '上海九六广场'}
2025-02-02 01:09:04 - my_logger - INFO - Forwarding request to http://IP/get_lng_lat/
2025-02-02 01:09:04 - tornado.access - INFO - 200 POST /ai_server/ (127.0.0.1) 158.67ms
...

可以看到转发成功了,原始的请求大约80ms,因为转发带来的额外延时大约也是80ms。所以如果对于实时性要求特别高的可能还需要注意。后续再加上其他的额外处理,单个请求最终的延时可能到达200ms,甚至500ms。对于类似大模型之类的调用来说,一般会花费数秒到数十秒来处理一个请求,这种开销就可以接受。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/894432.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu 22.04系统安装部署Kubernetes v1.29.13集群

Ubuntu 22.04系统安装部署Kubernetes v1.29.13集群 简介Kubernetes 的工作流程概述Kubernetes v1.29.13 版本Ubuntu 22.04 系统安装部署 Kubernetes v1.29.13 集群 1 环境准备1.1 集群IP规划1.2 初始化步骤(各个节点都需执行)1.2.1 主机名与IP地址解析1.…

基于SpringBoot的新闻资讯系统的设计与实现(源码+SQL脚本+LW+部署讲解等)

专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…

【Linux系统】计算机世界的基石:冯诺依曼架构与操作系统设计

文章目录 一.冯诺依曼体系结构1.1 为什么体系结构中要存在内存?1.2 冯诺依曼瓶颈 二.操作系统2.1 设计目的2.2 系统调用与库函数 一.冯诺依曼体系结构 冯诺依曼体系结构(Von Neumann Architecture)是计算机的基本设计理念之一,由…

消息队列应用示例MessageQueues-STM32CubeMX-FreeRTOS《嵌入式系统设计》P343-P347

消息队列 使用信号量、事件标志组和线标志进行任务同步时,只能提供同步的时刻信息,无法在任务之间进行数据传输。要实现任务间的数据传输,一般使用两种方式: 1. 全局变量 在 RTOS 中使用全局变量时,必须保证每个任务…

【NLP251】Transformer精讲 残差链接与层归一化

精讲部分,主要是对Transformer的深度理解方便日后从底层逻辑进行创新,对于仅应用需求的小伙伴可以跳过这一部分,不影响正常学习。 1. 残差模块 何凯明在2015年提出的残差网络(ResNet),Transformer在2016年…

Android学习制作app(ESP8266-01S连接-简单制作)

一、理论 部分理论见arduino学习-CSDN博客和Android Studio安装配置_android studio gradle 配置-CSDN博客 以下直接上代码和效果视频,esp01S的收发硬件代码目前没有分享,但是可以通过另一个手机网络调试助手进行模拟。也可以直接根据我的代码进行改动…

DeepSeek Janus-Pro:多模态AI模型的突破与创新

近年来,人工智能领域取得了显著的进展,尤其是在多模态模型(Multimodal Models)方面。多模态模型能够同时处理和理解文本、图像等多种类型的数据,极大地扩展了AI的应用场景。DeepSeek(DeepSeek-V3 深度剖析:…

OpenGL学习笔记(五):Textures 纹理

文章目录 纹理坐标纹理环绕方式纹理过滤——处理纹理分辨率低的情况多级渐远纹理Mipmap——处理纹理分辨率高的情况加载与创建纹理 &#xff08; <stb_image.h> &#xff09;生成纹理应用纹理纹理单元练习1练习2练习3练习4 通过上一篇着色部分的学习&#xff0c;我们可以…

代理模式——C++实现

目录 1. 代理模式简介 2. 代码示例 1. 代理模式简介 代理模式是一种行为型模式。 代理模式的定义&#xff1a;由于某些原因需要给某对象提供一个代理以控制该对象的访问。这时&#xff0c;访问对象不适合或者不能直接访问引用目标对象&#xff0c;代理对象作为访问对象和目标…

攻防世界 fileclude

代码审计 WRONG WAY! <?php include("flag.php"); highlight_file(__FILE__);//高亮显示文件的源代码 if(isset($_GET["file1"]) && isset($_GET["file2"]))//检查file1和file2参数是否存在 {$file1 $_GET["file1"];$fi…

Vue 响应式渲染 - 列表布局和v-html

Vue 渐进式JavaScript 框架 基于Vue2的学习笔记 - Vue 响应式渲染 - 列表布局和v-html 目录 列表布局 简单渲染列表 显示索引值 点击变色 V-html 作用 注意 采用策略 应用 总结 列表布局 简单渲染列表 Data中设置状态&#xff0c;是一个数组格式的默认信息。 然后…

LLMs之OpenAI o系列:OpenAI o3-mini的简介、安装和使用方法、案例应用之详细攻略

LLMs之OpenAI o系列&#xff1a;OpenAI o3-mini的简介、安装和使用方法、案例应用之详细攻略 目录 相关文章 LLMs之o3&#xff1a;《Deliberative Alignment: Reasoning Enables Safer Language Models》翻译与解读 LLMs之OpenAI o系列&#xff1a;OpenAI o3-mini的简介、安…

AI开发学习之——PyTorch框架

PyTorch 简介 PyTorch &#xff08;Python torch&#xff09;是由 Facebook AI 研究团队开发的开源机器学习库&#xff0c;广泛应用于深度学习研究和生产。它以动态计算图和易用性著称&#xff0c;支持 GPU 加速计算&#xff0c;并提供丰富的工具和模块。 PyTorch的主要特点 …

Python安居客二手小区数据爬取(2025年)

目录 2025年安居客二手小区数据爬取观察目标网页观察详情页数据准备工作&#xff1a;安装装备就像打游戏代码详解&#xff1a;每行代码都是你的小兵完整代码大放送爬取结果 2025年安居客二手小区数据爬取 这段时间需要爬取安居客二手小区数据&#xff0c;看了一下相关教程基本…

OpenCV:开运算

目录 1. 简述 2. 用腐蚀和膨胀实现开运算 2.1 代码示例 2.2 运行结果 3. 开运算接口 3.1 参数详解 3.2 代码示例 3.3 运行结果 4. 开运算应用场景 5. 注意事项 6. 总结 相关阅读 OpenCV&#xff1a;图像的腐蚀与膨胀-CSDN博客 OpenCV&#xff1a;闭运算-CSDN博客 …

JavaWeb入门-请求响应(Day3)

(一)请求响应概述 请求(HttpServletRequest):获取请求数据 响应(HttpServletResponse):设置响应数据 BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器就可访问,应用程序的逻辑和数据都存储在服务端(维护方便,响应速度一般) CS架构:Client/ser…

【SLAM】于AutoDL云上GPU运行GCNv2_SLAM的记录

配置GCNv2_SLAM所需环境并实现AutoDL云端运行项目的全过程记录。 本文首发于❄慕雪的寒舍 1. 引子 前几天写了一篇在本地虚拟机里面CPU运行GCNv2_SLAM项目的博客&#xff1a;链接&#xff0c;关于GCNv2_SLAM项目相关的介绍请移步此文章&#xff0c;本文不再重复说明。 GCNv2:…

罗格斯大学:通过输入嵌入对齐选择agent

&#x1f4d6;标题&#xff1a;AgentRec: Agent Recommendation Using Sentence Embeddings Aligned to Human Feedback &#x1f310;来源&#xff1a;arXiv, 2501.13333 &#x1f31f;摘要 &#x1f538;多代理系统必须决定哪个代理最适合给定的任务。我们提出了一种新的架…

团体程序设计天梯赛-练习集——L1-025 正整数A+B

一年之际在于春&#xff0c;新年的第一天&#xff0c;大家敲代码了吗&#xff1f;哈哈 前言 这道题分值是15分&#xff0c;值这个分&#xff0c;有一小点运算&#xff0c;难度不大&#xff0c;虽然说做出来了&#xff0c;但是有两个小疑点。 L1-025 正整数AB 题的目标很简单…

Leetcode:598

1&#xff0c;题目 2&#xff0c;思路 脑筋急转弯&#xff0c;看题目一时半会还没搞懂意思。 其实不然就是说ops是个矩阵集合&#xff0c;集合的每个矩阵有俩个元素理解为行列边距 m和n是理解为一个主矩阵&#xff0c;计算ops的每个小矩阵还有这个主矩阵的交集返回面积 3&…