编程目标
通过使用Flask和Celery,实现一个简单的Web应用程序,能够接收HTTP POST请求,并异步发送电子邮件。
说明
- 使用Flask创建一个简单的Web应用程序,包含一个HTTP POST路由,用于接收发送电子邮件的请求。
- 使用Celery实现一个异步任务,用于发送电子邮件。
- 发送电子邮件的请求应包含以下信息:
- 收件人地址
- 邮件主题
- 邮件内容
- 邮件发送成功后,返回响应表示成功发送。
技术栈
- Python
- Flask
- Celery
- Redis
接口设计
1. 发送邮件接口
- URL: /send-email
- 方法: POST
- 请求参数:
- recipient (string): 收件人地址
- subject (string): 邮件主题
- body (string): 邮件内容
- 成功响应:
- 状态码: 202 Accepted
- 响应体: {“message”: “邮件发送任务已启动”}
app.py
代码:
from flask import Flask, request, jsonify
from tasks import send_email_asyncapp = Flask(__name__)
app.json.ensure_ascii = False # 解决中文乱码问题
@app.route('/send-email', methods=['POST'])
def email_sender():post_form_data = request.jsonprint(post_form_data)# 调用异步发送邮件任务email_data = {'sender_email': 'csdn_代码写注释@163.com','sender_password': 'csdn_代码写注释','recipient': post_form_data['recipient'],'subject': post_form_data['subject'],'body': post_form_data['body']}send_email_async.delay(email_data)return jsonify({"message": "邮件发送任务已启动"}), 202if __name__ == "__main__":app.run(debug=True, host='0.0.0.0', port=5000)
tasks.py
代码:
from celery import Celery
import smtplib
from email.mime.text import MIMEText
from email.header import Headerapp = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')@app.task
def send_email_async(email_data):content = email_data.get('body')msg_from = email_data.get('sender_email')password = email_data.get('sender_password')msg_to = email_data.get('recipient')subject = email_data.get('subject')msg = MIMEText(content)msg['Subject'] = subjectmsg['From'] = msg_frommsg['To'] = msg_totry:s = smtplib.SMTP_SSL("smtp.163.com", 465)s.login(msg_from, password)s.sendmail(msg_from, msg_to, msg.as_string())s.quit()print('邮件发送成功!')return "邮件发送成功!"except Exception as e:print(f"邮件发送失败: {e}")return "邮件发送失败"
test_send_email.py
import requestssubject = "药价监督流水报告" # 主题
content = """
尊敬的xxx客户,随函附上本季度药价监督的流水报告。以下是本季度药价监管的关键要点:1. 监督范围:全国23个省市的主要药品批发市场及在线药品交易平台。
2. 检查次数:共计1,536次现场检查和3,245次在线监控。
3. 发现问题:在检查中发现15起价格违规行为,涉及7种药品。
4. 违规处理:所有违规行为均已记录在案,并对相关企业进行了警告及罚款处理。
5. 价格波动:本季度药品平均价格波动率为3.5%,与上季度相比下降了1.2个百分点。2024年05月09日
"""
def send_email():url = 'http://localhost:5000/send-email' # Flask 应用的 URLdata = {'recipient': 'csdn_代码写注释@qq.com', # 收件人地址'subject': subject, # 邮件主题'body': content # 邮件内容}# 发送 POST 请求response = requests.post(url, json=data)# 输出响应内容print('响应状态码是:', response.status_code)print('响应内容是:', response.text)send_email()
实现效果:
requirements.txt
:
amqp==5.2.0
async-timeout==4.0.3
billiard==4.2.0
blinker==1.8.2
celery==5.4.0
certifi==2024.2.2
charset-normalizer==3.3.2
click==8.1.7
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
colorama==0.4.6
dnspython==2.6.1
eventlet==0.36.1
Flask==3.0.3
Flask-Mail==0.9.1
greenlet==3.0.3
idna==3.7
itsdangerous==2.2.0
Jinja2==3.1.4
kombu==5.3.7
MarkupSafe==2.1.5
prompt-toolkit==3.0.43
python-dateutil==2.9.0.post0
redis==5.0.4
requests==2.31.0
six==1.16.0
tzdata==2024.1
urllib3==2.2.1
vine==5.1.0
wcwidth==0.2.13
Werkzeug==3.0.3
如何启动程序?
# step:0
新建虚拟环境
# step:1
pip install -r requirements.txt
# step:2
启动Redis服务
# 见第一张图片,分别打开三个窗口
窗口1:app
在这个窗口里运行命令:
python app.py窗口2:tests
在这个窗口里运行命令:
python test_send_email.py窗口3:Celery
在这个窗口里运行命令:
celery -A tasks worker --pool=solo --loglevel=info
然后你把代码里的发送方邮箱、发送方密码(注意是授权码)、接收方邮箱这三个配置参数改成真实的,即可运行本程序实现Celery完成异步发送邮件了。
愿你的努力,被世界看到。