之前项目为了自动化,所以写一个protobuf的解释器,用来生成项目所需的格式。
当然现在通过以下链接的指导,跳过手工分析,直接生成代码了。
https://developers.google.com/protocol-buffers/docs/reference/cpp-generated
这次文档主要是描述如何分析protobuf格式,以及如何收集需要的符号。
使用python 2.7脚本进行文本的处理。
程序分成4个模块:
expression: 格式的解析
symbol:在protobuf中定义的message等对象以及它们的层次结构,在这里已经看不见protobuf的样子了。
typecollection:基础类型定义和收集message等对象。
builder:遍历symbol,根据需要创建适合的输出文件。typecollection起到索引的作用。这次就不演示了。
1 测试用protobuf文件。(来源于google示例)
package tutorial;message Person {required string name = 1;required int32 id = 2 ;optional string email = 3;enum PhoneType {MOBILE = 0;HOME = 1;WORK = 2;}
message PhoneNumber {required string number = 1;optional PhoneType type = 2 [default = HOME];}repeated PhoneNumber phone = 4;
}message AddressBook {repeated Person person = 1;
}
2 expression实现---最简单的扫描方法,分析每一个word。
# -*- coding: UTF-8 -*-
# pb_expression.py
import sys
import os
import string
import shutil
import io
import pb_symbolclass StringBuffer(object):def __init__(self,src):self.src = src; pass;
def __del__(self):self.buf = None;pass; def OpenFile(self):self.Data = open(self.src).read() pass;class Expression(object):desc_set = set(['required','optional','repeated'])b_char_set = set(['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'])l_char_set = set (['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z'])digit_set = set([0,1,2,3,4,5,6,7,8,9])equals_char = '='space_char = ' 'openbrace_char = '{'closebrace_char = '}'semicolon_char = ';'tab_char = chr(9)newline_char = chr(10)return_char = chr(13)slash_char = chr(47)ctl_char_set = set([openbrace_char,closebrace_char,semicolon_char,equals_char,'\n','\r','\t','=',';',space_char])empty_char_set = set ([space_char,tab_char,newline_char,return_char])symbol_char_set = b_char_set | l_char_set | digit_set all_char_set = symbol_char_set | ctl_char_setdef backup(self):return self.index;def restore(self,prevIndex):self.index = prevIndex;
def forwardChar(self):if(self.index < self.count):self.index = self.index +1def backChar(self):if(self.index > 0):self.index = self.index -1def getchar(self):if( self.index < self.count):char = self.Buf.Data[self.index]self.forwardChar()return charreturn None;def skipComment(self):bkIndex = self.backup();while 1:char = self.getchar() next_char = self.getchar()if(char != self.slash_char or next_char != self.slash_char):self.restore(bkIndex)return;while 1:char = self.getchar()if(char == None):self.restore(bkIndex)return; if(char == self.newline_char):return;def getSpecialChar(self,currentchar): while 1:self.skipComment()char = self.getchar();if(char == None):break;else:if(char == currentchar):break; return char; def getVisibleChar(self):while 1:self.skipComment()char = self.getchar();if(char is None):break;else:if(char not in self.empty_char_set):break; return char; def getNextword(self):word = Nonegot1st = 0while 1:self.skipComment()char = self.getchar()if(char == None): break;if(got1st == 0):if(char not in self.ctl_char_set):word = chargot1st = 1else:if(char in self.ctl_char_set):self.backChar()break;else:word = word + char return word;def do_enum_item(self,pbEnum):memText = self.getNextword(); self.getSpecialChar(self.equals_char);memValue = self.getNextword(); self.getSpecialChar(self.semicolon_char); pbEnum.append_Member(memText,memValue)def do_enum_proc(self): symbol = self.getNextword(); pbEnum = pb_symbol.PBEnum(symbol)while 1: currentIndex = self.backup()word = self.getNextword(); if(word == None):break;self.restore(currentIndex)self.do_enum_item(pbEnum) end_char_Index = self.backup();char = self.getVisibleChar(); if(char == self.closebrace_char):break;else:self.restore(end_char_Index); self.symbol.append_enum(pbEnum)def do_message_proc(self): symbol = self.getNextword(); pbMsg = pb_symbol.PBMessage(symbol)while 1:currentIndex = self.backup()word = self.getNextword();if(word == None):break;if(word in self.token_set): subSymbol = pb_symbol.Symbol(self.symbol.tpDict,self.symbol.entity_full_path,False);subSymbol.update_namespace(symbol); self.restore(currentIndex);subExp = Expression(self.Buf,subSymbol);subExp.index = self.index;subExp.do_expression(); self.index = subExp.indexself.symbol.append_symbol(subSymbol)pbMsg.enableSymbol = 1else:if(word in self.desc_set): memType = self.getNextword(); memText = self.getNextword(); pbMsg.append_Member(word,memType,memText)self.getSpecialChar(self.semicolon_char); end_char_Index = self.backup();char = self.getVisibleChar(); if(char == self.closebrace_char):break;else: self.restore(end_char_Index);self.symbol.append_message(pbMsg)def do_import_proc(self): self.getSpecialChar(self.semicolon_char);def do_package_proc(self):word = self.getNextword(); self.symbol.update_namespace(word)self.getSpecialChar(self.semicolon_char);token_set = { 'message':do_message_proc,'enum':do_enum_proc,'import':do_import_proc,'package':do_package_proc} def do_expression(self):while 1: current_index = self.backup();token = self.getNextword(); if(token == None): break; if(token in self.token_set):proc = self.token_set[token];proc(self);else:self.restore(current_index)break;
def __init__(self,sBuf,symbol):self.Buf = sBuf;self.index = 0;self.count = len(self.Buf.Data)self.symbol = symbol;
3 symbol--定义对象类型以及层次
# -*- coding: UTF-8 -*-
# pb_symbol.py
import os
import string
import pb_typecollectionclass PBEntity(object):def __init__(self,entName,rtname):self.entName = entName; self.orgName = entName self.rtname = rtname def outputDebug(self):pass;def create_impl(self,entity_indent,top_ns):batch_list = list();return batch_list;def mem_include(self,entName):return False;class PBMessageMember(object):def __init__(self,option,memType,memText): self.option = option;self.memType = memType;self.memText = memText;def outputDebug(self):print(self.option,self.memType,self.memText)@propertydef mem_option(self):return self.option@propertydef mem_type(self):return self.memType;@propertydef mem_text(self):return self.memText class PBMessage(PBEntity): def __init__(self,entName): PBEntity.__init__(self,entName, entName );self.members = [] self.enableSymbol = 0; self.rt_ns = '';self.tpDict = None@propertydef Members(self):return self.members def attach_tp_dict(self,tpDict):self.tpDict = tpDict; def append_Member(self,option,memType,memText): msgMem = PBMessageMember(option,memType,memText)self.members.append(msgMem)def enable_Symbol(self,enable):self.enableSymbol = enable;def outputDebug(self,ns):print(ns,'message',self.entName);for entMsg in self.members: entMsg.outputDebug();print(''); def attach_tp_dict(self,tpDict):self.tpDict = tpDict; def set_rt_ns(self,rt_entity_full_path):self.rt_ns = rt_entity_full_pathdef mem_include(self,entName):for entMsg in self.members: if(entName == entMsg.memType):return True;return False;def detect_request(self):if(self.members.count > 0 ): return True;return False;class PBEnumMember(object):def __init__(self,memText,memValue): self.memText = memText;self.memValue = memValue; def outputDebug(self):print(self.memText,self.memValue)class PBEnum( PBEntity):def __init__(self,entName):PBEntity.__init__(self,entName,entName);self.members = []def append_Member(self,memText,memValue): msgMem = PBEnumMember(memText,memValue)self.members.append(msgMem) def outputDebug(self,ns):print(ns,'enum',self.entName);for entEnum in self.members: entEnum.outputDebug();print(''); class Symbol(object):def __init__(self,tpDict,fullpath,rooted):self.namespace = ''self.tpDict = tpDictself.rooted = rootedself.entity_full_path = fullpathself.rt_entity_full_path = fullpath self.entitylist = []self.containerlist = []def __del__(self):pass;def update_namespace(self,namespace): self.namespace = namespace;if(self.rooted == False):if(self.entity_full_path == ''):self.entity_full_path = namespace self.rt_entity_full_path = namespaceelse: self.entity_full_path = '%s_%s' %(self.entity_full_path,namespace)self.rt_entity_full_path = '%s_%s' %(self.entity_full_path,namespace)def append_type_dict(self,entity,isMsg):if(isMsg == True):if(self.entity_full_path == ''):self.tpDict.insert_type(entity.entName ,entity.rtname,entity,'')else:self.tpDict.insert_type(entity.entName,'%s::%s' % (self.rt_entity_full_path, entity.rtname),entity,'')else:if(self.entity_full_path == ''):self.tpDict.insert_type(entity.entName ,entity.rtname,entity,entity.rtname)else:self.tpDict.insert_type(entity.entName,'%s::%s' % (self.rt_entity_full_path, entity.rtname),entity,'%s::%s' % (self.entity_full_path, entity.rtname))def append_message(self,msg): self.entitylist.append(msg)self.containerlist.append(msg)msg.attach_tp_dict(self.tpDict);if(self.rt_entity_full_path == ''):msg.set_rt_ns(self.rt_entity_full_path)else:msg.set_rt_ns(self.rt_entity_full_path + '_')self.append_type_dict(msg,True)def append_enum(self,enum):self.entitylist.append(enum)self.append_type_dict(enum,False)def append_symbol(self,symbol):self.entitylist.append(symbol)self.containerlist.append(symbol)def outputDebug(self,ns):for entity in self.entitylist: entity.outputDebug(ns +'::'+self.namespace);def query_entitylist(self):return self.entitylist;def query_containerlist(self):return self.containerlist;def query_pb_ns(self):return self.namespace;def mem_include(self,entName):for entity in self.entitylist: if(entity.mem_include(entName) == True):return True;return False;class PBProxy(object):def __init__(self,entity):self.entity = entity@propertydef enableSymbol(self):return self.entity.enableSymboldef mem_include(self,entName): return self.entity.mem_include(entName)def create_impl(self,entity_indent,top_ns):return self.entity.create_impl(entity_indent,top_ns) @propertydef entName(self):return self.entity.entName; @propertydef rtname(self):return self.entity.rtname;@propertydef orgName(self):return self.entity.orgName;@propertydef members(self):return self.entity.members;@propertydef rt_ns(self):return self.entity.rt_ns; @propertydef namespace(self):return self.entity.namespace; @propertydef rooted(self):return self.entity.rooted;@propertydef entity_full_path(self):return self.entity.entity_full_path; @propertydef rt_entity_full_path(self):return self.entity.rt_entity_full_path;@propertydef entitylist(self):return self.entity.entitylist @propertydef containerlist(self):return self.entity.containerlist @propertydef tpDict(self):return self.entity.tpDict;def detect_request(self):return self.entity.detect_request()@propertydef Members(self):return self.entity.members@propertydef mem_option(self):return self.entity.mem_option@propertydef mem_type(self):return self.entity.mem_type;@propertydef mem_text(self):return self.entity.mem_text
4 typecollection
# -*- coding: UTF-8 -*-
# pb_typecollection.pyimport os
import pb_symbolclass typeDict(object):op_req_desc = 'required'op_opt_desc = 'optional'op_rep_desc = 'repeated'def __init__(self):self.collection = dict()self.insert_type('int32','__int32',pb_symbol.PBEntity('int32','int32'),'')self.insert_type('int64','__int64',pb_symbol.PBEntity('int64','int64'),'')self.insert_type('uint32','unsigned int',pb_symbol.PBEntity('uint32','uint32'),'')self.insert_type('bool','bool',pb_symbol.PBEntity('bool','bool'),'')self.insert_type('float','float',pb_symbol.PBEntity('float','float'),'')self.insert_type('double','double',pb_symbol.PBEntity('double','double'),'')self.insert_type('string','const char*',pb_symbol.PBEntity('string','string'),'')self.insert_type('bytes','const char*',pb_symbol.PBEntity('bytes','bytes'),'') def insert_type(self, entName, rtType,entity,orgType): self.collection[entName] = (rtType,entity,orgType); def output_debug(self):print('type collection')for item in self.collection.items():print(item);
5 测试脚本
# -*- coding: UTF-8 -*-import pb_symbol
import pb_expression
import pb_typecollectionif __name__ == '__main__':pb_file = 'google_tutorial.proto'sBuf = pb_expression.StringBuffer(pb_file); tpDict = pb_typecollection.typeDict()symbol = pb_symbol.Symbol(tpDict,'',True);try:sBuf.OpenFile();exp = pb_expression.Expression(sBuf,symbol);exp.do_expression();symbol.outputDebug(''); tpDict.output_debug(); except Exception as exc: print("%s",exc);print("done");
6 输出
命名空间:::tutorial::Person
类型名称:PhoneType
('::tutorial::Person', 'enum', 'PhoneType')
('MOBILE', '0')
('HOME', '1')
('WORK', '2')
('::tutorial::Person', 'message', 'PhoneNumber')
('required', 'string', 'number')
('optional', 'PhoneType', 'type')
('::tutorial', 'message', 'Person')
('required', 'string', 'name')
('required', 'int32', 'id')
('optional', 'string', 'email')
('repeated', 'PhoneNumber', 'phone')
('::tutorial', 'message', 'AddressBook')
('repeated', 'Person', 'person')
type collection
('PhoneNumber', ('Person::PhoneNumber', <pb_symbol.PBMessage object at 0x02B9DED0>, ''))
('int32', ('__int32', <pb_symbol.PBEntity object at 0x02BE3F70>, ''))
('string', ('const char*', <pb_symbol.PBEntity object at 0x02BEE0F0>, ''))
('double', ('double', <pb_symbol.PBEntity object at 0x02BEE0B0>, ''))
('float', ('float', <pb_symbol.PBEntity object at 0x02BEE070>, ''))
('bytes', ('const char*', <pb_symbol.PBEntity object at 0x02BEE130>, ''))
('Person', ('Person', <pb_symbol.PBMessage object at 0x02BEE210>, ''))
('bool', ('bool', <pb_symbol.PBEntity object at 0x02BEE050>, ''))
('PhoneType', ('Person::PhoneType', <pb_symbol.PBEnum object at 0x02BEE450>, 'Person::PhoneType'))
('int64', ('__int64', <pb_symbol.PBEntity object at 0x02BE3FB0>, ''))
('uint32', ('unsigned int', <pb_symbol.PBEntity object at 0x02BE3FF0>, ''))
('AddressBook', ('AddressBook', <pb_symbol.PBMessage object at 0x02BEE7B0>, ''))
参考
protobuf的git地址:https://github.com/google/protobuf