在现代 Web 开发中,异步任务处理和用户通知是两个重要的功能。由于老旧测试平台【测试用例生成平台,源码分享】进行智能化升级后,未采用异步任务处理,大模型推理时间较长,导致任务阻塞,无法处理其他任务,体验较差。本文将以 Django 框架为基础,结合 Celery 和 Redis,完成一个完整的异步任务系统,并实现用户通知功能。我们还会对接阿里云百炼 DeepSeek-R1 模型,通过大模型生成测试用例,并提供数据的增删改查功能。以下是详细的实现步骤和完整代码示例。
一、需求分析与功能拆解
1.1 功能需求
本项目的主要需求如下:
- 用户通过 API 输入任务名称和字段信息,系统为任务生成唯一的 UUID,并提示用户任务已创建。
- 异步任务系统调用阿里云百炼 DeepSeek-R1 模型,传递字段信息生成 JSON 格式的测试用例。
- 将模型生成的测试用例存储到数据库的测试用例预览表中。
- 通知用户任务完成,支持通过 WebSocket 或邮件通知。
- 提供一个页面对生成的测试用例数据进行增删改查操作。
1.2 技术选型
- Django:作为后端框架,负责处理 API 请求、任务状态管理和数据库操作。
- Celery:用于实现任务的异步处理。
- Redis:作为 Celery 的消息队列。
- 阿里云百炼 DeepSeek-R1:调用大模型接口生成测试用例。
- Django Channels:实现 WebSocket 通知功能。
- 前端页面:用于展示和管理测试用例数据。
二、项目创建与环境配置
2.1 安装依赖
在开始开发之前,需要安装以下依赖工具和库:
pip install django celery redis requests channels
2.2 项目目录结构
项目的目录结构如下:
your_project_name/
├── your_project_name/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ ├── asgi.py
│ ├── celery.py
├── your_app_name/
│ ├── __init__.py
│ ├── models.py
│ ├── views.py
│ ├── tasks.py
│ ├── signals.py
│ ├── urls.py
├── manage.py
三、系统实现
3.1 配置 Celery 和 Redis
3.1.1 在 settings.py
中配置 Celery
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'# Django Channels 配置
ASGI_APPLICATION = 'your_project_name.asgi.application'
CHANNEL_LAYERS = {'default': {'BACKEND': 'channels_redis.core.RedisChannelLayer','CONFIG': {'hosts': [('127.0.0.1', 6379)],},},
}
3.1.2 创建 celery.py
在项目根目录下创建 celery.py
文件,用于初始化 Celery:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery# 设置 Django 默认的 settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')app = Celery('your_project_name')# 从 Django 的 settings.py 加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')# 自动发现任务模块
app.autodiscover_tasks()
3.1.3 配置 __init__.py
在项目目录下的 __init__.py
文件中添加以下内容:
from __future__ import absolute_import, unicode_literals# Celery 应用
from .celery import app as celery_app__all__ = ('celery_app',)
3.1.4 配置 asgi.py
在项目根目录下的 asgi.py
文件中配置 WebSocket 支持:
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStackos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')application = ProtocolTypeRouter({'http': get_asgi_application(),'websocket': AuthMiddlewareStack(URLRouter([# WebSocket 路由])),
})
3.2 数据模型设计
在 models.py
中定义两张表:任务表 和 测试用例预览表。
from django.db import models
import uuid# 测试用例预览表
class TestCasePreview(models.Model):id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)field_name = models.CharField(max_length=255)field_type = models.CharField(max_length=255)field_value = models.TextField()generated_test_case = models.JSONField() # 存储生成的 JSON 测试用例created_at = models.DateTimeField(auto_now_add=True)# 异步任务表
class AsyncTask(models.Model):TASK_STATUS = (('PENDING', 'Pending'),('PROCESSING', 'Processing'),('COMPLETED', 'Completed'),('FAILED', 'Failed'),)id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)name = models.CharField(max_length=255, unique=True)status = models.CharField(max_length=20, choices=TASK_STATUS, default='PENDING')created_at = models.DateTimeField(auto_now_add=True)updated_at = models.DateTimeField(auto_now=True)result_message = models.TextField(null=True, blank=True)
3.3 异步任务实现
在 tasks.py
中实现异步任务逻辑:
import requests
from celery import shared_task
from .models import AsyncTask, TestCasePreview
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync@shared_task
def generate_test_cases(task_id, field_data):try:# 获取任务task = AsyncTask.objects.get(id=task_id)task.status = 'PROCESSING'task.save()# 调用阿里云百炼 DeepSeek-R1 APIclient = OpenAI(# 如果没有配置环境变量,请用百炼API Key替换:api_key="sk-xxx"# api_key='sk-xxx',api_key='sk-712a634dbaa7444d838d20b25eb938xx', # todo 此处需更换base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")reasoning_content = "" # 定义完整思考过程answer_content = "" # 定义完整回复is_answering = False # 判断是否结束思考过程并开始回复# 创建聊天完成请求completion = client.chat.completions.create(model="deepseek-r1", # 此处以 deepseek-r1 为例,可按需更换模型名称messages=[{'role': 'user','content': prompt_param}],stream=True,# 解除以下注释会在最后一个chunk返回Token使用量# stream_options={# "include_usage": True# })print("\n" + "=" * 20 + "思考过程" + "=" * 20 + "\n")for chunk in completion:# 如果chunk.choices为空,则打印usageif not chunk.choices:print("\nUsage:")print(chunk.usage)else:delta = chunk.choices[0].delta# 打印思考过程if hasattr(delta, 'reasoning_content') and delta.reasoning_content != None:print(delta.reasoning_content, end='', flush=True)reasoning_content += delta.reasoning_contentelse:# 开始回复if delta.content != "" and not is_answering:print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")is_answering = True# 打印回复过程print(delta.content, end='', flush=True)answer_content += delta.content# 存储测试用例到数据库for field in field_data:TestCasePreview.objects.create(field_name=field["field_name"],field_type=field["field_type"],field_value=field["field_value"],generated_test_case=extract_json_objects(answer_content))# 更新任务状态task.status = 'COMPLETED'task.result_message = "测试用例生成成功!"task.save()# 通过 WebSocket 通知用户channel_layer = get_channel_layer()async_to_sync(channel_layer.group_send)(f"task_{task_id}",{"type": "task_status", "message": "任务已完成"})except Exception as e:task.status = 'FAILED'task.result_message = str(e)task.save()
3.4 用户接口
在 views.py
中提供创建任务的接口:
from django.http import JsonResponse
from .models import AsyncTask
from .tasks import generate_test_casesdef create_task(request):if request.method == 'POST':task_name = request.POST.get('task_name')field_data = request.POST.get('field_data') # JSON 格式的字段列表# 检查任务名称是否唯一if AsyncTask.objects.filter(name=task_name).exists():return JsonResponse({"error": "任务名称已存在"}, status=400)# 创建任务task = AsyncTask.objects.create(name=task_name)# 调用异步任务generate_test_cases.delay(task.id, field_data)return JsonResponse({"message": "任务已创建", "task_id": task.id})
3.5 WebSocket 通知
WebSocket 消费者 consumers.py
:
from channels.generic.websocket import AsyncWebsocketConsumer
import jsonclass TaskConsumer(AsyncWebsocketConsumer):async def connect(self):self.task_id = self.scope['url_route']['kwargs']['task_id']self.task_group_name = f"task_{self.task_id}"await self.channel_layer.group_add(self.task_group_name,self.channel_name)await self.accept()async def disconnect(self, close_code):await self.channel_layer.group_discard(self.task_group_name,self.channel_name)async def task_status(self, event):message = event['message']await self.send(text_data=json.dumps({"message": message}))
3.6 前端页面展示
创建一个简单的页面用于展示测试用例:
<!DOCTYPE html>
<html lang="en">
<head><title>测试用例预览</title>
</head>
<body><h1>测试用例预览</h1><table id="test-cases"><thead><tr><th>字段名称</th><th>字段类型</th><th>字段值</th><th>测试用例</th></tr></thead><tbody><!-- 数据通过 Ajax 加载 --></tbody></table>
</body>
</html>
四、总结
本项目从需求分析出发,采用 Django 框架结合 Celery、Redis 和 Django Channels,完整实现了一个基于异步任务的测试用例生成系统。系统支持任务状态管理、用户通知,以及前端数据的增删改查功能,逻辑清晰,功能完善,非常适合在实际项目中推广应用。在老旧测试平台智能化中,异步是不可或缺的整改步骤,调试好异步任务后,后续优化将进入快速迭代阶段。加油吧~