protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化、反序列化),一般应用于网络传输,可支持多种编程语言。
protobuf怎样使用这里不再介绍,本文主要介绍在MINA、Netty、Twisted中怎样使用protobuf,不了解protobuf的同学能够去參考我的还有一篇博文。
在前面的一篇博文中。有介绍到一种用一个固定为4字节的前缀Header来指定Body的字节数的一种消息切割方式。在这里相同要使用到。
仅仅是当中Body的内容不再是字符串,而是protobuf字节码。
在处理业务逻辑时,肯定不希望还要对数据进行序列化和反序列化。而是希望直接操作一个对象,那么就须要有对应的编码器和解码器。将序列化和反序列化的逻辑写在编码器和解码器中。有关编码器和解码器的实现,上一篇博文中有介绍。
Netty包中已经自带针对protobuf的编码器和解码器。那么就不用再自己去实现了。而MINA、Twisted还须要自己去实现protobuf的编码器和解码器。
这里定义一个protobuf数据结构,用于描写叙述一个学生的信息。保存为StudentMsg.proto文件:
message Student {// IDrequired int32 id = 1; // 姓名required string name = 2;// emailoptional string email = 3;// 朋友repeated string friends = 4;
}
用StudentMsg.proto分别生成Java和Python代码。将代码加入到对应的项目中。
生成的代码就不再贴上来了。
以下分别介绍在Netty、MINA、Twisted怎样使用protobuf来传输Student信息。
Netty:
Netty自带protobuf的编码器和解码器,各自是ProtobufEncoder和ProtobufDecoder。须要注意的是,ProtobufEncoder和ProtobufDecoder仅仅负责protobuf的序列化和反序列化,而处理消息Header前缀和消息切割的还须要LengthFieldBasedFrameDecoder和LengthFieldPrepender。LengthFieldBasedFrameDecoder即用于解析消息Header前缀。依据Header中指定的Body字节数截取Body,LengthFieldPrepender用于在wirte消息时在消息前面加入一个Header前缀来指定Body字节数。
public class TcpServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ChannelPipeline pipeline = ch.pipeline();// 负责通过4字节Header指定的Body长度将消息切割pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));// 负责将frameDecoder处理后的完整的一条消息的protobuf字节码转成Student对象pipeline.addLast("protobufDecoder",new ProtobufDecoder(StudentMsg.Student.getDefaultInstance()));// 负责将写入的字节码加上4字节Header前缀来指定Body长度pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));// 负责将Student对象转成protobuf字节码pipeline.addLast("protobufEncoder", new ProtobufEncoder());pipeline.addLast(new TcpServerHandler());}});ChannelFuture f = b.bind(8080).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}
处理事件时,接收和发送的參数直接就是Student对象:
public class TcpServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 读取client传过来的Student对象StudentMsg.Student student = (StudentMsg.Student) msg;System.out.println("ID:" + student.getId());System.out.println("Name:" + student.getName());System.out.println("Email:" + student.getEmail());System.out.println("Friends:");List<String> friends = student.getFriendsList();for(String friend : friends) {System.out.println(friend);}// 新建一个Student对象传到clientStudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();builder.setId(9);builder.setName("server");builder.setEmail("123@abc.com");builder.addFriends("X");builder.addFriends("Y");StudentMsg.Student student2 = builder.build();ctx.writeAndFlush(student2);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}
MINA:
在MINA中没有针对protobuf的编码器和解码器。可是能够自己实现一个功能和Netty一样的编码器和解码器。
编码器:
public class MinaProtobufEncoder extends ProtocolEncoderAdapter {@Overridepublic void encode(IoSession session, Object message,ProtocolEncoderOutput out) throws Exception {StudentMsg.Student student = (StudentMsg.Student) message;byte[] bytes = student.toByteArray(); // Student对象转为protobuf字节码int length = bytes.length;IoBuffer buffer = IoBuffer.allocate(length + 4);buffer.putInt(length); // write headerbuffer.put(bytes); // write bodybuffer.flip();out.write(buffer);}
}
解码器:
public class MinaProtobufDecoder extends CumulativeProtocolDecoder {@Overrideprotected boolean doDecode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception {// 假设没有接收完Header部分(4字节)。直接返回falseif (in.remaining() < 4) {return false;} else {// 标记開始位置,假设一条消息没传输完毕则返回到这个位置in.mark();// 读取header部分,获取body长度int bodyLength = in.getInt();// 假设body没有接收完整,直接返回falseif (in.remaining() < bodyLength) {in.reset(); // IoBuffer position回到原来标记的地方return false;} else {byte[] bodyBytes = new byte[bodyLength];in.get(bodyBytes); // 读取body部分StudentMsg.Student student = StudentMsg.Student.parseFrom(bodyBytes); // 将body中protobuf字节码转成Student对象out.write(student); // 解析出一条消息return true;}}}
}
MINAserver加入protobuf的编码器和解码器:
public class TcpServer {public static void main(String[] args) throws IOException {IoAcceptor acceptor = new NioSocketAcceptor();// 指定protobuf的编码器和解码器acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MinaProtobufEncoder(), new MinaProtobufDecoder()));acceptor.setHandler(new TcpServerHandle());acceptor.bind(new InetSocketAddress(8080));}
}
这样。在处理业务逻辑时,就和Netty一样了:
public class TcpServerHandle extends IoHandlerAdapter {@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {cause.printStackTrace();}@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {// 读取client传过来的Student对象StudentMsg.Student student = (StudentMsg.Student) message;System.out.println("ID:" + student.getId());System.out.println("Name:" + student.getName());System.out.println("Email:" + student.getEmail());System.out.println("Friends:");List<String> friends = student.getFriendsList();for(String friend : friends) {System.out.println(friend);}// 新建一个Student对象传到clientStudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();builder.setId(9);builder.setName("server");builder.setEmail("123@abc.com");builder.addFriends("X");builder.addFriends("Y");StudentMsg.Student student2 = builder.build();session.write(student2);}
}
Twisted:
在Twisted中。首先定义一个ProtobufProtocol类,继承Protocol类,充当编码器和解码器。处理业务逻辑的TcpServerHandle类再继承ProtobufProtocol类。调用或重写ProtobufProtocol提供的方法。
# -*- coding:utf-8 –*-from struct import pack, unpack
from twisted.internet.protocol import Factory
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
import StudentMsg_pb2# protobuf编码、解码器
class ProtobufProtocol(Protocol):# 用于临时存放接收到的数据_buffer = b""def dataReceived(self, data):# 上次未处理的数据加上本次接收到的数据self._buffer = self._buffer + data# 一直循环直到新的消息没有接收完整while True:# 假设header接收完整if len(self._buffer) >= 4:# header部分,按大字节序转int,获取body长度length, = unpack(">I", self._buffer[0:4])# 假设body接收完整if len(self._buffer) >= 4 + length:# body部分,protobuf字节码packet = self._buffer[4:4 + length]# protobuf字节码转成Student对象student = StudentMsg_pb2.Student()student.ParseFromString(packet)# 调用protobufReceived传入Student对象self.protobufReceived(student)# 去掉_buffer中已经处理的消息部分self._buffer = self._buffer[4 + length:]else:break;else:break;def protobufReceived(self, student):raise NotImplementedErrordef sendProtobuf(self, student):# Student对象转为protobuf字节码data = student.SerializeToString()# 加入Header前缀指定protobuf字节码长度self.transport.write(pack(">I", len(data)) + data)# 逻辑代码
class TcpServerHandle(ProtobufProtocol):# 实现ProtobufProtocol提供的protobufReceiveddef protobufReceived(self, student):# 将接收到的Student输出print 'ID:' + str(student.id)print 'Name:' + student.nameprint 'Email:' + student.emailprint 'Friends:'for friend in student.friends:print friend# 创建一个Student并发送给clientstudent2 = StudentMsg_pb2.Student()student2.id = 9student2.name = 'server'.decode('UTF-8') # 中文须要转成UTF-8字符串student2.email = '123@abc.com'student2.friends.append('X')student2.friends.append('Y')self.sendProtobuf(student2)factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()
以下是Java编写的一个client測试程序:
public class TcpClient {public static void main(String[] args) throws IOException {Socket socket = null;DataOutputStream out = null;DataInputStream in = null;try {socket = new Socket("localhost", 8080);out = new DataOutputStream(socket.getOutputStream());in = new DataInputStream(socket.getInputStream());// 创建一个Student传给serverStudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();builder.setId(1);builder.setName("client");builder.setEmail("xxg@163.com");builder.addFriends("A");builder.addFriends("B");StudentMsg.Student student = builder.build();byte[] outputBytes = student.toByteArray(); // Student转成字节码out.writeInt(outputBytes.length); // write headerout.write(outputBytes); // write bodyout.flush();// 获取server传过来的Studentint bodyLength = in.readInt(); // read headerbyte[] bodyBytes = new byte[bodyLength];in.readFully(bodyBytes); // read bodyStudentMsg.Student student2 = StudentMsg.Student.parseFrom(bodyBytes); // body字节码解析成StudentSystem.out.println("Header:" + bodyLength);System.out.println("Body:");System.out.println("ID:" + student2.getId());System.out.println("Name:" + student2.getName());System.out.println("Email:" + student2.getEmail());System.out.println("Friends:");List<String> friends = student2.getFriendsList();for(String friend : friends) {System.out.println(friend);}} finally {// 关闭连接in.close();out.close();socket.close();}}
}
用client分别測试上面三个TCPserver:
server输出:
ID:1
Name:client
Email:xxg@163.com
Friends:
A
B
client输出:
Header:32
Body:
ID:9
Name:server
Email:123@abc.com
Friends:
X
Y
作者:叉叉哥 转载请注明出处:http://blog.csdn.net/xiao__gui/article/details/38864961
MINA、Netty、Twisted一起学系列
MINA、Netty、Twisted一起学(一):实现简单的TCPserver
MINA、Netty、Twisted一起学(二):TCP消息边界问题及按行切割消息
MINA、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)
MINA、Netty、Twisted一起学(四):定制自己的协议
MINA、Netty、Twisted一起学(五):整合protobuf
MINA、Netty、Twisted一起学(六):session
MINA、Netty、Twisted一起学(七):公布/订阅(Publish/Subscribe)
MINA、Netty、Twisted一起学(八):HTTPserver
MINA、Netty、Twisted一起学(九):异步IO和回调函数
MINA、Netty、Twisted一起学(十):线程模型
MINA、Netty、Twisted一起学(十一):SSL/TLS
MINA、Netty、Twisted一起学(十二):HTTPS
源代码
https://github.com/wucao/mina-netty-twisted