引言&协议概述
(CMPP)是中国移动为实现短信业务而制定的一种通信协议,用于在客户端(SP,Service Provider)和中国移动短信网关之间传输短消息,有时也叫做移动梦网短信业务。CMPP3.0是该协议的第三个版本,相比于前两个版本,它增加了对长短信的支持、优化了数据结构等。本文对CMPP协议进行介绍,并给出Python实现CMPP协议栈的思路。
Python的asyncio模块提供了一套简洁的异步IO编程模型,非常适合用于实现协议栈。
CMPP协议基于客户端/服务端模型工作。由客户端(短信应用,如手机,应用程序等)先和ISMG(Internet Short Message Gateway 互联网短信网关)建立起TCP长连接,并使用CMPP命令与ISMG进行交互,实现短信的发送和接收。在CMPP协议中,无需同步等待响应就可以发送下一个指令,实现者可以根据自己的需要,实现同步、异步两种消息传输模式,满足不同场景下的性能要求。
连接成功,发送短信并查询短信发送成功
连接成功,从ISMG接收到短信
协议帧介绍
在CMPP协议中,每个PDU都包含两个部分:CMPP Header和CMPP Body。
CMPP Header
Header包含以下字段,大小长度都是4字节
- Total Length:整个PDU的长度,包括Header和Body。
- Command ID:用于标识PDU的类型(例如,Connect、Submit等)。
- Sequence Id:序列号,用来匹配请求和响应。
用Python Asyncio实现CMPP协议栈里的建立连接
可以以本文的代码作为基础,很容易地在上面扩展。
代码结构组织如下:
.
├── LICENSE
├── README.md
├── cmpp
│ ├── __init__.py
│ ├── client.py
│ ├── protocol.py
│ └── utils.py
├── requirements.txt
├── setup.cfg
└── setup.py
- cmpp/protocol.py:定义不同 CMPP 协议数据单元 (PDU) 的数据类,包括 CmppHeader、CmppConnect、CmppConnectResp、CmppSubmit 和 CmppSubmitResp
- cmpp/client.py:该类处理与 ISMG(互联网短消息网关)的连接以及发送/接收 PDU。 主要 asyncio 进行异步 I/O 操作
- cmpp/utils.py:定义 BoundAtomic 类,它是一种线程安全的方式来管理具有最小值和最大值的序列号。保证CMPP序列号在一定的范围内
- setup.py:配置要分发的包,指定包名称、版本、作者和依赖项等元数据。
利用Python锁实现sequence_id
sequence_id是从1到0x7FFFFFFF的值
import threadingclass BoundAtomic:def __init__(self, min_val: int, max_val: int):assert min_val <= max_val, "min must be less than or equal to max"self.min = min_valself.max = max_valself.value = min_valself.lock = threading.Lock()def next_val(self) -> int:with self.lock:if self.value >= self.max:self.value = self.minelse:self.value += 1return self.value
在Python中定义CMPP PDU,篇幅有限,仅定义数个PDU
from dataclasses import dataclass
from typing import Union, List@dataclass
class CmppHeader:total_length: intcommand_id: intsequence_id: int@dataclass
class CmppConnect:source_addr: strauthenticator_source: bytesversion: inttimestamp: int@dataclass
class CmppConnectResp:status: intauthenticator_ismg: strversion: int@dataclass
class CmppSubmit:msg_id: intpk_total: intpk_number: intregistered_delivery: intmsg_level: intservice_id: strfee_user_type: intfee_terminal_id: strfee_terminal_type: inttp_pid: inttp_udhi: intmsg_fmt: intmsg_src: strfee_type: strfee_code: strvalid_time: strat_time: strsrc_id: strdest_usr_tl: intdest_terminal_id: List[str]dest_terminal_type: intmsg_length: intmsg_content: byteslink_id: str@dataclass
class CmppSubmitResp:msg_id: intresult: int@dataclass
class CmppPdu:header: CmppHeaderbody: Union[CmppHeader, CmppConnectResp, CmppSubmit, CmppSubmitResp]
实现编解码方法
@dataclass
class CmppConnect:source_addr: strauthenticator_source: bytesversion: int# MMDDHHMMSS formattimestamp: intdef encode(self) -> bytes:source_addr_bytes = self.source_addr.encode('utf-8').ljust(6, b'\x00')version_byte = self.version.to_bytes(1, 'big')timestamp_bytes = self.timestamp.to_bytes(4, 'big')return source_addr_bytes + self.authenticator_source + version_byte + timestamp_bytes@dataclass
class CmppConnectResp:status: intauthenticator_ismg: strversion: int@staticmethoddef decode(data: bytes) -> 'CmppConnectResp':status = int.from_bytes(data[0:4], 'big')authenticator_ismg = data[4:20].rstrip(b'\x00').decode('utf-8')version = data[20]return CmppConnectResp(status=status, authenticator_ismg=authenticator_ismg, version=version)@dataclass
class CmppPdu:header: CmppHeaderbody: Union[CmppConnect, CmppConnectResp, CmppSubmit, CmppSubmitResp]def encode(self) -> bytes:body_bytes = self.body.encode()self.header.total_length = len(body_bytes) + 12header_bytes = (self.header.total_length.to_bytes(4, 'big') +self.header.command_id.to_bytes(4, 'big') +self.header.sequence_id.to_bytes(4, 'big'))return header_bytes + body_bytes@staticmethoddef decode(data: bytes) -> 'CmppPdu':header = CmppHeader(total_length=int.from_bytes(data[0:4], 'big'),command_id=int.from_bytes(data[4:8], 'big'),sequence_id=int.from_bytes(data[8:12], 'big'))body_data = data[12:header.total_length]if header.command_id == CONNECT_RESP_ID:body = CmppConnectResp.decode(body_data)else:raise NotImplementedError("not implemented yet.")return CmppPdu(header=header, body=body)
asyncio tcp流相关代码
class CmppClient:def __init__(self, host: str, port: int):self.host = hostself.port = portself.sequence_id = BoundAtomic(1, 0x7FFFFFFF)self.reader = Noneself.writer = Noneasync def connect(self):self.reader, self.writer = await asyncio.open_connection(self.host, self.port)async def close(self):if self.writer:self.writer.close()
实现同步的connect_ismg方法
async def connect_ismg(self, request: CmppConnect):if self.writer is None or self.reader is None:raise ConnectionError("Client is not connected")sequence_id = self.sequence_id.next_val()header = CmppHeader(0, command_id=CONNECT_ID, sequence_id=sequence_id)pdu: CmppPdu = CmppPdu(header=header, body=request)self.writer.write(pdu.encode())await self.writer.drain()length_bytes = await self.reader.readexactly(4)response_length = int.from_bytes(length_bytes)response_data = await self.reader.readexactly(response_length)return CmppPdu.decode(response_data)
运行example,验证连接成功
async def main():client = CmppClient(host='localhost', port=7890)await client.connect()print("Connected to ISMG")connect_request = CmppConnect(source_addr='source_addr',authenticator_source=b'authenticator_source',version=0,timestamp=1122334455,)connect_response = await client.connect_ismg(connect_request)print(f"Connect response: {connect_response}")await client.close()print("Connection closed")asyncio.run(main())
总结
本文简单对CMPP协议进行了介绍,并尝试用python实现协议栈,但实际商用发送短信往往更加复杂,面临诸如流控、运营商对接、传输层安全等问题,可以选择华为云消息&短信(Message & SMS)服务。