Python 全栈系列254 异步服务与并发调用

说明

发现对于异步(IO)还是太陌生了,熟悉一下。

内容

今天搞了一整天,感觉有一个long story to tell,但是不知道从何说起,哈哈。

异步(协程)需要保证链路上的所有环节都是异步(协程)的,任何一个环节没这么做都会导致整体的异步失效,退化为同步

以上大概是我能总结出的最核心的内容了。古早的时候(我还只是建模),我用python写了一个简单的规则引擎。这个引擎需要从各个库表(都是mysql)中读取一个客户的n个维度信息,然后用规则引擎给出判断。最初是用多个(大概20+)同步查询来逐个的完成数据获取,然后就必然的越来越慢,一个客户要一分钟才能完成数据准备。后来和当时的架构小哥商量了下,决定用asyncio + aiomysql来做,结果还是不错的,大概2~3秒不到就搞好了。而且机器使用资源没有多,反而是更少了,当时真是觉得惊艳。

在开发过程中,很多时候就是因为一连串的async def ,然后有任何一个环节没有async就会失败,所以很头痛,这也是印象深刻的地方。

总体上,IO并发用好了还是非常惊人的。究其原因,同步方式是比较自然的程序书写方式,有很多包已经为CPU密集型应用做了超多的应用。

客户端并发

首先我还是想排除掉多线程的方法,按道理讲协程的效率一定会高很多。虽然多线程用法看起来更容易理解,但我想还是用协程吧。

下面的几个函数分为三层:最底下的是json_query_worker,它是真正发起并发请求的异步函数,上面一层是json_player,它负责把多个异步任务进行封装和发布,然后回收任务结果。json_player的定义本身更像是place holder,他们只有被asyncio.run触发时(或者用run_until_complete)才会真正执行。make_json_task_list是更上一层,将用户端的请求,按照格式进行准备。

import asyncio
import aiohttp
import json 
async def json_query_worker(task_id = None , url = None , json_params = None, time_out = 60, semaphore = None):try:async with semaphore:async with aiohttp.ClientSession() as session:async with session.post(url, json = json_params, timeout=aiohttp.ClientTimeout(total=time_out)) as response:res = await response.text()return {task_id: json.loads(res)}except Exception as e:# Return a dictionary with the error messageprint(e)return {task_id:'error'}
async def json_player(task_list , concurrent = 3):semaphore = asyncio.Semaphore(concurrent) # 并发限制tasks = [asyncio.ensure_future(json_query_worker(**x,  semaphore = semaphore)) for x in task_list]return await asyncio.gather(*tasks)# 根据 url 和 params 构造任务请求列表 - tuple_list = [(url,params)]
# 模式A: 面向相同的url, 参数列表需要分为批次
def make_json_task_list(url = None, params_list = None):task_list = []for i in range(len(params_list)):tem_dict = {}tem_dict['task_id'] = 'task_' + str(i).zfill(5)tem_dict['url'] = urltem_dict['json_params'] = params_list[i]task_list.append(tem_dict)return task_list

例如,假设要发起的请求是一个url+一个param(dict)来完成的,构造参数列表来模拟需要执行的多个请求,然后输入请求的url和param列表,然后通过make_json_task_list生成任务列表,最后通过asyncio.run来发起真正的并发。

params_list = [{f't{i}': i} for i in range(30)]
import time 
tick1 = time.time()
some_task_list = make_json_task_list('http://localhost:8000/async_thread_test/', params_list =params_list)
# 使用 asyncio.run() 运行协程
res = asyncio.run(json_player(some_task_list, concurrent=30))
tick2 = time.time()
print(tick2-tick1)

服务端并发

因为我构造了大量的微服务,一些微服务之间是有依赖的。例如本次的实验,我需要向deepseek进行请求,我先构造了服务A,该服务是以单核方式启动的,做了一层封装,以便使用标准的url+param方式提交请求。

from openai import OpenAI
DeepSeekHandler_path = r'/deepseek/'
class DeepSeekHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)api_key = some_dict['api_key']model = some_dict['model']messages = some_dict['messages']client = OpenAI(api_key=api_key, base_url="https://api.deepseek.com/v1")response = client.chat.completions.create(model=model,messages=messages)res_content = response.choices[0].message.contentmsg_dict = {}msg_dict['content'] = res_contentself.write(json.dumps(msg_dict))DeepSeekHandler_tuple = (DeepSeekHandler_path,DeepSeekHandler)
app_list.append(DeepSeekHandler_tuple)

然后在这里,我对服务进行调用。在客户端以asyncrun方式并发请求,然后我发现结果是串行的。原因有两个:一方面是接口本身是以同步方式创建的,另一方面服务器启动的时候只有单核。

结论: 在请求-返回的数据链中,只要有一个是同步的(deepseek),那么整个请求就是同步的。

我相信如果服务器开多核了,那么速度会按n被增长,但是这样是极其不经济的(但以前有一些服务还真可能就是这么干的)。同时也说明了使用异步,在IO方面可以给系统带来多大的提升。

所以接下来,在保持单核启动不变的情况(因为该服务就是进行api转发,根本不应该消耗cpu),修改服务为异步形式

import time
from concurrent.futures.thread import ThreadPoolExecutor
from tornado import gen, web, ioloop
from tornado.concurrent import run_on_executor
# 在http的层面异步
ADeepSeekHandler_path = r'/a_deepseek/'
class ADeepSeekHandler(web.RequestHandler):executor = ThreadPoolExecutor(max_workers=1000)@run_on_executordef make_a_deepseek_request(self, url = None, api_key = None,model = None,  messages = None):print("adeepseek...start")# for json -返回的就是json,不解开client = OpenAI(api_key=api_key, base_url=url)response = client.chat.completions.create(model=model,messages=messages)res_content = response.choices[0].message.contentprint("adeepseek...end")return res_content@gen.coroutinedef post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)# print(some_dict)url = some_dict['url']api_key = some_dict['api_key']model = some_dict['model']messages = some_dict['messages']res = yield self.make_a_deepseek_request(url = url, api_key = api_key, model=model, messages = messages)msg_dict = {}msg_dict['content'] = resself.write(json.dumps(msg_dict))ADeepSee_tuple = (ADeepSeekHandler_path,ADeepSeekHandler)
app_list.append(ADeepSee_tuple)

这里我有点每太明白的是为啥还要引入ThreadPoolExecutor,没有再深入去研究,但是看起来是在引用应用协程。我试着把max_workers设置到了1000,后来实测的时候,发现其实毫无压力。可能在第一次会耗费一点cpu,可能就10%左右,不到5秒,后面的cpu耗用几乎都为0.
在这里插入图片描述
然后下面是一个真实测试,花了我大约4毛钱调接口。
单个接口的调用:问一个问题,这个问题的回复通常在13秒±3秒。可以看到输入的token大约20,回复的大约200的样子。

import requests as req 
some_dict = {}
some_dict['api_key'] = 'sk-xxx'
some_dict['model'] = "deepseek-chat"
some_dict['messages']  = [{'role':'user','content':'介绍三个北京必去的旅游景点。'}]
# 容器内
req.post('http://ty.orbitx.cn:24097/deepseek/',json=some_dict).json()
{'content': '北京作为中国的首都,拥有丰富的历史文化遗产和现代化的城市风貌,以下是三个必去的旅游景点:\n\n1. 故宫(The Forbidden City):故宫是明清两代的皇宫,也是世界上现存规模最大、保存最完整的木质结构古建筑群。它位于北京市中心,天安门广场北侧,是北京旅游的标志性景点之一。游客可以在这里欣赏到精美的古建筑艺术,了解中国古代皇室的生活和文化。\n\n2. 长城(The Great Wall):长城是中国古代伟大的军事防御工程,也是世界文化遗产之一。北京附近有多段长城可以游览,其中最著名的是八达岭长城和慕田峪长城。长城蜿蜒起伏,气势磅礴,是体验中国古代军事文化和壮丽自然风光的绝佳地点。\n\n3. 颐和园(The Summer Palace):颐和园是中国古典园林的代表之一,也是世界文化遗产。它位于北京西郊,是一座集山水园林和宫殿建筑于一体的大型皇家园林。颐和园以其精美的园林景观、丰富的历史文化内涵和宁静的自然环境吸引着众多游客。\n\n这三个景点不仅代表了北京的历史和文化,也是中国乃至世界的重要文化遗产,非常值得一游。'}

单个的异步请求,14.3秒

resp = req.post('http://xxx/a_deepseek/' , json = single_param).json()
{'content': '北京作为中国的首都,拥有丰富的历史文化遗产和现代化的城市风貌,以下是三个必去的旅游景点:\n\n1. 故宫(The Forbidden City):故宫是明清两代的皇宫,也是世界上现存规模最大、保存最完整的木质结构古建筑群。它位于北京市中心,占地约72万平方米,拥有超过9000间房间。故宫不仅是了解中国古代皇家生活和艺术的绝佳地点,也是联合国教科文组织认定的世界文化遗产。\n\n2. 长城(The Great Wall):长城是中国古代的军事防御工程,全长超过2万公里,其中最著名的部分位于北京附近,如八达岭长城、慕田峪长城等。长城蜿蜒起伏,雄伟壮观,是中国古代劳动人民智慧和勇气的象征,也是世界新七大奇迹之一。\n\n3. 颐和园(The Summer Palace):颐和园是中国清朝时期的皇家园林,以昆明湖和万寿山为基础,以杭州西湖为蓝本,汲取江南园林的设计手法而建成。颐和园集中国园林艺术之大成,是保存最完整的皇家行宫御苑,也是联合国教科文组织认定的世界文化遗产。\n\n这三个景点不仅代表了北京的历史和文化,也是中国乃至世界的重要文化遗产,是每一位来北京的游客都不容错过的。'}

直接从客户端发起并发请求,3条请求一共花了14.89s,所以是并发的。

param_list = [param1, param2, param3]
task_list = make_json_task_list('http://xxx/a_deepseek/', params_list=param_list)
res = asyncio.run(json_player(task_list, concurrent=30))

在这里插入图片描述
然后我之前还构造了另一个服务B,目的是屏蔽掉用户端对异步的感知,就像在调用一个接口。

对这个服务构造并发请求,外层的url是向tornado服务发起的并发,里面的url是每个请求需要执行的任务(url+param)。参数的构造多加了一层,最外层的url是本来客户端要请求的地址,里面的params对应请求需要的参数。

some_dict = {}
some_dict['url'] = 'http://xxx/a_deepseek/'
some_dict['params'] = {}
some_dict['params']['url'] = 'https://api.deepseek.com/v1'
some_dict['params']['api_key'] = 'xxx'
some_dict['params']['model'] = "deepseek-chat"
some_dict['params']['messages']  = [{'role':'user','content':'介绍三个北京必去的旅游景点。'}]some_dict1 = {}
some_dict1['url'] = 'http://xxx/a_deepseek/'
some_dict1['params'] = {}
some_dict1['params']['url'] = 'https://api.deepseek.com/v1'
some_dict1['params']['api_key'] = 'xxx'
some_dict1['params']['model'] = "deepseek-chat"
some_dict1['params']['messages']  = [{'role':'user','content':"介绍三个成都最有名的美食。"}]some_dict2 = {}
some_dict2['url'] = 'http://xxx/a_deepseek/'
some_dict2['params'] = {}
some_dict2['params']['url'] = 'https://api.deepseek.com/v1'
some_dict2['params']['api_key'] = 'xxx'
some_dict2['params']['model'] = "deepseek-chat"
some_dict2['params']['messages']  = [{'role':'user','content':"介绍三首泰勒斯威夫特好听的歌曲"}]
llm_param_list = [some_dict,some_dict1,some_dict2]

服务对应的部分如下,按照标准的url+params方式发起异步请求

# AsyncThreadMakeARequest -- ATMAR for json
ATMARHandler_path = r'/atmar/'
class ATMARHandler(web.RequestHandler):executor = ThreadPoolExecutor(max_workers=1000)@run_on_executordef make_a_request(self, url = None, params = None ):print("atmar...start")# for json -返回的就是json,不解开the_res = req.post(url, json = params).json()print("atmar...end")return the_res@gen.coroutinedef post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)print(some_dict)url  = some_dict['url']params = some_dict['params']res = yield self.make_a_request(url = url, params = params)self.write(json.dumps(res))ATMAR_tuple = (ATMARHandler_path,ATMARHandler)
app_list.append(ATMAR_tuple)

发起调用

tick1 = time.time()
some_task_list_1 = make_json_task_list('http://localhost:8000/atmar/', params_list =llm_param_list)
# 使用 asyncio.run() 运行协程
res = asyncio.run(json_player(some_task_list_1, concurrent=30))
tick2 = time.time()
print(tick2-tick1)
15.933489322662354

在这里插入图片描述

这说明了,只要链路上保证全部为异步,继续叠加异步调用,结果还是异步。当然传的次数越多,损耗会越大。主要的损耗来自序列化/反序列化的过程。

以并发300的速度发出300个请求,服务器的速度明显下降了,大约是原来的1/3~1/6, 所以估计每个账号至少可以并发30,最多并发50.
在这里插入图片描述

另两种tornado异步服务的搭建方式

反正看起来都是多线程,又有点协程的样子,不多研究了,体会一下就好

## 2 方式二:采用多线程+修饰器模式 
import time
from concurrent.futures.thread import ThreadPoolExecutorfrom tornado import gen, web, ioloop
from tornado.concurrent import run_on_executorAsyncThread_testHandler_path = r'/async_thread_test/'
class AsyncThread_testHandler(web.RequestHandler):executor = ThreadPoolExecutor(max_workers=1000)@run_on_executordef sleep(self):print("休息1...start")time.sleep(5)print("休息1...end")return 'ok'@gen.coroutinedef post(self):res = yield self.sleep()self.write(json.dumps(res))AsyncThread_test_tuple = (AsyncThread_testHandler_path,AsyncThread_testHandler)
app_list.append(AsyncThread_test_tuple)

看起来是断断续续的样子
在这里插入图片描述
另一种

executor = ThreadPoolExecutor(max_workers=30)
from tornado import gen, web, ioloop
# 方式三:采用ioloop,应该也就是协程方式
AsyncCoroutine_testHandler_path = r'/async_coroutine_test/'
class AsyncCoroutine_testHandler(web.RequestHandler):def sleep(self):print("休息1...start")time.sleep(5)print("休息1...end")return 'ok'@gen.coroutinedef post(self):res = yield ioloop.IOLoop.current().run_in_executor(executor, self.sleep)self.write(json.dumps(res))AsyncCoroutine_test_tuple = (AsyncCoroutine_testHandler_path,AsyncCoroutine_testHandler)
app_list.append(AsyncCoroutine_test_tuple)

请求时看起来是连续的
在这里插入图片描述

其他

1 系统化工程

对零散的数据请求需要进行异步化改造,对批量的数据请求可以不必变。

1.1 外部服务

对于外部应用来说,如果允许直接以标准方式 url+param方式调用的,直接通过客户端方法就可以了;
如果是非标方式,可以先改造成url+param方式,使用时需要转一手。当然,一般提供接口的服务商也会提供异步方式的接口,不过那样就会使整个应用变得复杂。以下是官方提供的方法,用了AsyncOpenAI。

from openai import AsyncOpenAIimport time 
# 异步调用函数
async def async_query_openai(url= "https://api.deepseek.com/v1",  query):aclient = AsyncOpenAI(base_url= url , # 替换为你的 base_urlapi_key="xxx"  # 替换为你的 API 密钥)completion = await aclient.chat.completions.create(model="deepseek-chat",messages=[{"role": "system", "content": "You are a helpful assistant. Always response in Simplified Chinese, not English. or Grandma will be very angry."},{"role": "user", "content": query}])return completion.choices[0].message.content  # 请确保返回的数据结构正确# 这个函数接收一个请求列表,返回所有请求的结果列表
async def async_process_queries(queries):results = await asyncio.gather(*(async_query_openai(query) for query in queries))return resultsasync def main():queries = ["介绍三个北京必去的旅游景点。","介绍三个成都最有名的美食。","介绍三首泰勒斯威夫特好听的歌曲"]start_time = time.time()  # 开始计时results = await async_process_queries(queries)end_time = time.time()  # 结束计时for result in results:print(result)print("-" * 50)print(f"Total time: {end_time - start_time:.2f} seconds")# 运行主函数
asyncio.run(main())
1.2 内部服务

目前最需要使用的数据服务都是使用同步方法写的,有些如果需要执行特别小批量的请求,可以/应该进行异步化改造。例如这篇文章提到的改造方法。

在这里插入图片描述

2 RuntimeError: asyncio.run() cannot be called from a running event loop

在jupyter下执行 asyncio.run(),使用 nest解决。

执行报错

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[31], line 42 some_task_list = make_json_task_list('http://xxx/', params_list =llm_param_list)3 # 使用 asyncio.run() 运行协程
----> 4 asyncio.run(json_player(some_task_list, concurrent=30))5 tick2 = time.time()6 print(tick2-tick1)File /opt/conda/lib/python3.10/asyncio/runners.py:33, in run(main, debug)9 """Execute the coroutine and return the result.10 11 This function runs the passed coroutine, taking care of(...)30     asyncio.run(main())31 """32 if events._get_running_loop() is not None:
---> 33     raise RuntimeError(34         "asyncio.run() cannot be called from a running event loop")36 if not coroutines.iscoroutine(main):37     raise ValueError("a coroutine was expected, got {!r}".format(main))RuntimeError: asyncio.run() cannot be called from a running event loop

使用nest_asyncio

import nest_asyncio
nest_asyncio.apply()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/854497.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv10涨点改进轻量化双卷积DualConv,完成涨点且计算量和参数量显著下降

本文独家改进:双卷积由组卷积和异构卷积组成,执行3x3 和 1x1 卷积运算Q代替其他卷积核仅执行 1x1 卷积。 DualIConv 显着降低了深度神经网络的计算成本和参数数量,同时在某些情况下令人惊讶地实现了比原始模型略高的精度。 我们使用 DualConv 将轻量级 MobileNetV2 的参数数量…

Linux驱动面试题

1.导出符号表的原理? 2.字符设备驱动的框架流程 open read wirte close 是系统调用(从用户空间进入内核空间的唯一的方法)会产生swi软中断《也会存在软中断号》(从User模式切换到SVC(管理模式)下因为在…

你还不会选ProfiNET和EtherCAT网线?

在现代工业自动化领域,ProfiNET和EtherCAT是两种非常流行的通信协议。选择合适的网线对于确保通信的稳定性和效率至关重要。 ProfiNET是什么? ProfiNET是一种基于以太网的通信协议,由德国西门子公司开发。它支持实时通信,广泛应用…

商超智能守护:AI监控技术在零售安全中的应用

结合思通数科大模型的图像处理、图像识别、目标检测和知识图谱技术,以下是详细的商超合规监测应用场景描述: 1. 员工仪容仪表监测: 利用图像识别技术,系统可以自动检测员工是否按照规范整理妆容、穿着工作服,以及是否…

大学课设项目,Windows端基于UDP的网络聊天程序的服务端和客户端

文章目录 前言项目需求介绍一、服务端1.对Udp套接字进行一个封装2. UdpServer的编写3. Task.h4.protocol.h的编写5.线程池的编写6.main.cc 二、客户端1. Socket.h2.protocol.h3.UdpClient4.menu.h5.main.cpp 三、运行图 前言 本次项目可以作为之前内容的一个扩展,学…

003 gitee怎样将默认的私有仓库变成公开仓库

先点击“管理”, 再点击“基本信息” 在“是否开源”里, 选择:开源

python文件操作、文件操作、读写文件、写模式

with读取文件数据内容 with open(filepath,mode,encoding) as file:#具体操作,例如:print(file.read())#查看文件所有的内容。 with:Python中的一个上下文管理器,用于简化资源的管理和释放。它可以用于任意需要进行资源分配和释放的情境…

使用 Azure AI Document Intelligence 创建智能文档处理

简介 借助 Azure AI 文档智能中的预生成模型,无需自行训练模型,即可从常见表单和文档中提取数据。 公司中,表单种类繁多,如发票、收据、调查表等。你可能想知道,从这些文档中提取姓名、地址、金额等信息需要多少工作…

APaaS:智能制造助手

资金不足、IT基础架构薄弱...... 车间业务需求不断地在增加...... 都在说数字化,都在说转型...... 随着企业竞争的日益激烈和市场环境的快速变化,企业需要一个灵活、高效、快速响应市场变化的新平台。在这样的背景下,APaaS应运而生&#x…

RocketMQ快速入门:集成java客户端实现各类消息发送|异步、同步、顺序、单向、延迟、事务(五)附带源码

0. 引言 前面的章节中,我们已经针对rocketmq的基本概念和消息发送、消费流程进行了讲解,但实际在开发中如何实现rocketmq的接入、实现消息发送、消费还没有落实,那么今天,我们继续来学习如何基于java client集成rocketMQ 1. 集成…

火车头采集中英文翻译教程

火车头采集怎么实现数据中文翻译成英文,或英文翻译成中文? 火车头采集没有自带的翻译功能,但可以使用插件功能来实现:导入翻译插件(例如谷歌翻译插件,百度翻译插件等),然后在火车头…

视频媒介VS文字媒介

看到一篇蛮有思考意义的文章就摘录下来了,也引起了反思 目录 一、视频的定义 二、”视频媒介“与”文字媒介”作对比 1.形象 VS 抽象 2.被动 VS 主动 三、视频的缺点-【更少】的思考 1.看视频为啥会导致【更少的思考】 2.内容的【浅薄化】 3.内容的【娱乐化…

【尚庭公寓SpringBoot + Vue 项目实战】用户管理(十五)

【尚庭公寓SpringBoot Vue 项目实战】用户管理(十五) 文章目录 【尚庭公寓SpringBoot Vue 项目实战】用户管理(十五)1、业务介绍2、接口实现2.1、根据条件分页查询用户列表2.2、根据ID更新用户状态 1、业务介绍 用户管理共包含两…

关于生成式人工智能的发展

近年来,人工智能的发展引起了广泛关注,尤其是在深度学习领域,以深度神经网络为代表的人工智能技术已经取得了重大突破。然而,深度神经网络也有其局限性。深度学习技术在处理一些复杂问题时表现良好,但在解决更广泛的任…

PS系统教学20

油漆桶工具 作用:上色工具的一种 上一些纯色、图案 纯色 新建图层填充区域的源设置为前景色设置前景色为想要的颜色左键单机填充(altdelete)如果要添加背景色(Ctrldelete)也可以与选区工具结合使用 可以与快速选择…

【React】如何使用npm run start命令运行两个服务

我们开发前端项目时,有时候需要本地 mock 数据,这样就需要启动两个服务,一个是接口服务,一个是前端项目。可以安装一个插件来帮助我们通过一个命令启动两个服务。 方法一 添加& npm run server 注意:Windows系统…

牛客热题:最长回文子串

📟作者主页:慢热的陕西人 🌴专栏链接:力扣刷题日记 📣欢迎各位大佬👍点赞🔥关注🚓收藏,🍉留言 文章目录 牛客热题:最长回文子串题目链接方法一&am…

Linux之网络编程

Linux之网络编程 TCP协议 TCP(Transmission ControlProtocol) : 传输控制协议,是一个 面向连接的、可靠的、基于字节流的传输层的协议。TCP 协议建立的是一种点到点的,一对一的可靠连接协议 特点: 数据无丢失数据无失序数据无错误数据无重…

振动分析-3-基于Python的FFT幅值修正与能量修正

幅值修正与能量修正过程(更正) 参考什么是泄漏? 参考什么是窗函数? 参考使用python实现快速傅里叶变换(FFT) 参考频谱泄露和窗函数以及加窗后幅度修正和python代码实现 1 快速傅里叶变换(FFT) 离散傅里叶变换(discr…

84. 柱状图中最大的矩形(hard)

单调栈&#xff1a; 就是说&#xff1a;固定高度&#xff0c;寻找最长宽度&#xff0c;如何找最长宽度&#xff0c;需要从heights[i] 这一个元素开始向左向右两边寻找heights[j] <heights[i]的j元素&#xff0c;也就是找两边第一小于heights[i]的元素。此过程中就是利用到单…