0.配置文件
BROKER_URL = "redis://127.0.0.1:6379/1"
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
1.启动文件
from celery import Celery
from ihome. tasks import config
celery_app = Celery( "ihome" )
celery_app. config_from_object( config)
celery_app. autodiscover_tasks( [ "ihome.tasks.sms" ] )
2.发送短信辅助类
from CCPRestSDK import REST
accountSid = '8aaf0708568d4143015697b0f4960888'
accountToken = '42d3191f0e6745d6a9ddc6c795da0bed'
appId = '8aaf0708568d4143015697b0f56e088f'
serverIP = 'app.cloopen.com'
serverPort = '8883'
softVersion = '2013-12-26' class CCP ( object ) : """自己封装的发送短信的辅助类""" instance = None def __new__ ( cls) : if cls. instance is None : obj = super ( CCP, cls) . __new__( cls) obj. rest = REST( serverIP, serverPort, softVersion) obj. rest. setAccount( accountSid, accountToken) obj. rest. setAppId( appId) cls. instance = objreturn cls. instancedef send_template_sms ( self, to, datas, temp_id) : "" "" "" result = self. rest. sendTemplateSMS( to, datas, temp_id) status_code = result. get( "statusCode" ) if status_code == "000000" : return 0 else : return - 1
3.发送短信后端逻辑
@api. route( "/sms_codes/<re(r'1[34578]\d{9}'):mobile>" )
def get_sms_code ( mobile) : """获取短信验证码""" image_code = request. args. get( "image_code" ) image_code_id = request. args. get( "image_code_id" ) if not all ( [ image_code_id, image_code] ) : return jsonify( errno= RET. PARAMERR, errmsg= "参数不完整" ) try : real_image_code = redis_store. get( "image_code_%s" % image_code_id) except Exception as e: current_app. logger. error( e) return jsonify( errno= RET. DBERR, errmsg= "redis数据库异常" ) if real_image_code is None : return jsonify( errno= RET. NODATA, errmsg= "图片验证码失效" ) try : redis_store. delete( "image_code_%s" % image_code_id) except Exception as e: current_app. logger. error( e) if real_image_code. lower( ) != image_code. lower( ) : return jsonify( errno= RET. DATAERR, errmsg= "图片验证码错误" ) try : send_flag = redis_store. get( "send_sms_code_%s" % mobile) except Exception as e: current_app. logger. error( e) else : if send_flag is not None : return jsonify( errno= RET. REQERR, errmsg= "请求过于频繁,请60秒后重试" ) try : user = User. query. filter_by( mobile= mobile) . first( ) except Exception as e: current_app. logger. error( e) else : if user is not None : return jsonify( errno= RET. DATAEXIST, errmsg= "手机号已存在" ) sms_code = "%06d" % random. randint( 0 , 999999 ) try : redis_store. setex( "sms_code_%s" % mobile, constants. SMS_CODE_REDIS_EXPIRES, sms_code) redis_store. setex( "send_sms_code_%s" % mobile, constants. SEND_SMS_CODE_INTERVAL, 1 ) except Exception as e: current_app. logger. error( e) return jsonify( errno= RET. DBERR, errmsg= "保存短信验证码异常" ) result_obj = send_sms. delay( mobile, [ sms_code, int ( constants. SMS_CODE_REDIS_EXPIRES/ 60 ) ] , 1 ) print ( result_obj. id ) ret = result_obj. get( ) print ( "ret=%s" % ret) return jsonify( errno= RET. OK, errmsg= "发送成功" )
4.client相关代码
from celery import Celery
from ihome. libs. yuntongxun. sms import CCP
celery_app = Celery( "ihome" , broker= "redis://127.0.0.1:6379/1" ) @celery_app. task
def send_sms ( to, datas, temp_id) : """发送短信的异步任务""" ccp = CCP( ) ccp. send_template_sms( to, datas, temp_id)
5.worker相关代码
from ihome. tasks. main import celery_app
from ihome. libs. yuntongxun. sms import CCP
@celery_app. task
def send_sms ( to, datas, temp_id) : """发送短信的异步任务""" ccp = CCP( ) try : result = ccp. send_template_sms( to, datas, temp_id) except Exception as e: result = - 2 return result