1.jforgame-socket开发背景
1.1socket介绍
Socket是用于在计算机网络中实现通信的编程接口。它提供了一种通过网络在不同计算机之间传输数据的方式。不同http基于请求-响应模式,socket是全双工的,允许服务器、客户端同时向另外一端发送数据。由于socket工作在TCP/IP协议中的运输层,而不是像http这种工作在应用层,因此使用socket通信需要建立自己的私有协议栈。通过私有协议栈定义网络传输的字节流的具体意义。
1.2原生netty/mina复杂性与缺陷
Netty/Mina都是用于构建高性能、可扩展网络应用程序的Java框架。它们提供了一套抽象的、事件驱动的异步网络编程模型,使得开发者可以轻松地构建各种网络应用,例如服务器、客户端、代理等。
尽管Netty/Mina是一个强大而灵活的框架,但是对于初学者来说,可能会感到一些复杂性。下面是一些可能与Netty相关的复杂性因素:
-
编解码器和处理器:Netty/Mina提供了一套强大的编解码器和处理器,用于处理不同的协议和数据格式。选择合适的编解码器和处理器,并正确配置它们,可能需要一些学习和实践。特别是处理数据流粘包/拆包问题。
-
业务消息路由器:Netty/Mina只提供关于网络消息IO方面的内容,对于业务消息的路由处理(类似于SpringMVC的DispatchServlet机制),需要项目自行封装。
-
更高层次的统一:当项目引入Netty或者Mina之后,业务代码就与框架强耦合。如果中途需要切换顶层网关,需要修改项目的很多代码。
2.jforgame-socket框架介绍
jforgame-socket是一个通用的网络工具,底层对Netty/Mina进行了封装,屏蔽了私有协议栈定制,消息编解码,消息粘包/拆包问题。jforgame-socket传输层使用了TCP协议。可用于任何需要socket通信的应用。例如游戏服务器,聊天服务器等等。
2.1jforgame-socket结构
jforgame-socket主要有三个组件组成,分别是
- jforgame-socket-api: 底层通用接口,封装了Session接口,服务器/客户端通信接口,消息路由,客户端rpc接口等重要接口。
- jforgame-socket-mina:socket的mina实现,提供了服务器/客户端大部分默认组件,以及私有协议栈设计,提供TcpSocketServerBuilder工具类用于快速启动游戏服务器
- jforgame-socket-netty:socket的netty实现,提供了服务器/客户端大部分默认组件,以及私有协议栈设计,提供TcpSocketServerBuilder工具类用于快速启动游戏服务器
2.2jforgame-socket其他依赖组件
jforgame-socket提供了默认的私有协议栈,但对于一个完整消息的编解码是没有强制绑定的,运行客户端代码自行设计,例如可以使用protobuf,json,xml,protostuf等等。
jforgame项目提供了两种默认编解码工具
- jforgame-codec-protobuf:使用protobuf进行消息编解码,为了省略.protobuf文件的编写,引入JProtobuf基于注解的编解码。
- jforgame-codec-struct:基于jdk反射实现的编解码,对于一个简单的javabean,该工具基于反射解析该类每个字段的类型以及顺序,动态生成消息实体以及注入属性。使用简单方便,推荐!
3.使用案例
3.1添加中央仓库地址(国内其他云镜像仓库,可能还未同步)
<repository><id>repo2</id><name>Mirror from Maven Repo2</name><url>http://repo2.maven.org/maven2/</url>
</repository>
3.2添加socket依赖(以jforgame-socket-netty为例)
<dependency><groupId>io.github.jforgame</groupId><artifactId>jforgame-socket-netty</artifactId><version>1.0.0</version>
</dependency>
3.3添加消息编解码依赖(以jforgame-codec-struct为例)
<dependency><groupId>io.github.jforgame</groupId><artifactId>jforgame-codec-struct</artifactId><version>1.0.0</version>
</dependency>
3.4自行设计线程模型(1.1将提供默认实现)
游戏业务由于类型非常多,像棋牌,MMORPG,策略游戏,h5小游戏等,所用的业务线程模型是不一致的,作为socket框架无法提供通用实现。
public class DispatchThreadModel implements ThreadModel {private static final Logger logger = LoggerFactory.getLogger(DispatchThreadModel.class);private final int CORE_SIZE = Runtime.getRuntime().availableProcessors();/*** task worker pool*/private final Worker[] workerPool = new Worker[CORE_SIZE];private static final AtomicBoolean running = new AtomicBoolean(true);public DispatchThreadModel() {ThreadFactory threadFactory = new NamedThreadFactory("message-business");for (int i = 0; i < CORE_SIZE; i++) {Worker w = new Worker();workerPool[i] = w;threadFactory.newThread(w).start();}}private static class Worker implements Runnable {LinkedBlockingQueue<BaseGameTask> taskQueue = new LinkedBlockingQueue<>();void receive(BaseGameTask task) {taskQueue.add(task);}@Overridepublic void run() {while (running.get()) {try {BaseGameTask task = taskQueue.take();task.run();} catch (InterruptedException e) {// TODO other way?Thread.currentThread().interrupt();} catch (Exception e) {logger.error("", e);}}}}/*** when receiving a task, the executor will calculate the thread index based on the {@link BaseGameTask#getDispatchKey()}* for example, if the executor group has N threads, the task will be dispatched to the thread which index is (dispatchKey() % N)* @param task command task*/@Overridepublic void accept(BaseGameTask task) {if (task == null) {throw new NullPointerException("task is null");}if (!running.get()) {return;}int distributeKey = (int) (task.getDispatchKey() % CORE_SIZE);workerPool[distributeKey].receive(task);}/*** when this executor shuts down, it will no longer accept new task* and the remained tasks will be abandoned either.*/@Overridepublic void shutDown() {running.compareAndSet(true, false);}}
3.5消息分发器
对网络session的创建,摧毁,消息接受等提供一个钩子接口,允许(必须)业务代码自行设计。如此方可适用于所有socket应用。
public class MessageIoDispatcher extends ChainedMessageDispatcher {private MessageHandlerRegister handlerRegister;private MessageParameterConverter msgParameterConverter;private MessageFactory messageFactory = GameMessageFactory.getInstance();public MessageIoDispatcher(String scanPath) {this.msgParameterConverter = new DefaultMessageParameterConverter(messageFactory);this.handlerRegister = new CommonMessageHandlerRegister(scanPath, messageFactory);MessageHandler messageHandler = (session, message) -> {int cmd = GameMessageFactory.getInstance().getMessageId(message.getClass());MessageExecutor cmdExecutor = handlerRegister.getMessageExecutor(cmd);if (cmdExecutor == null) {logger.error("message executor missed, cmd={}", cmd);return true;}Object[] params = msgParameterConverter.convertToMethodParams(session, cmdExecutor.getParams(), message);Object controller = cmdExecutor.getHandler();int sessionId = (int) session.getAttribute(SessionProperties.DISTRIBUTE_KEY);MessageTask task = MessageTask.valueOf(session, sessionId, controller, cmdExecutor.getMethod(), params);task.setRequest(message);// 丢到任务消息队列,不在io线程进行业务处理GameServer.getMonitorGameExecutor().accept(task);return true;};addMessageHandler(messageHandler);}@Overridepublic void onSessionCreated(IdSession session) {session.setAttribute(SessionProperties.DISTRIBUTE_KEY, SessionManager.INSTANCE.getNextSessionId());}@Overridepublic void onSessionClosed(IdSession session) {long playerId = SessionManager.INSTANCE.getPlayerIdBy(session);if (playerId > 0) {logger.info("角色[{}]close session", playerId);PlayerEnt player = GameContext.playerManager.get(playerId);BaseGameTask closeTask = new BaseGameTask() {@Overridepublic void action() {GameContext.playerManager.playerLogout(playerId);}};GameServer.getMonitorGameExecutor().accept(closeTask);}}}
3.6一句话启动服务器
除了自行设计两个无法提供默认实现的组件之外,其他的功能jforgame-socet框架都帮你完成啦。只需引入builder工具类即可。
TcpSocketServerBuilder.newBuilder().bindingPort(HostAndPort.valueOf(ServerConfig.getInstance().getServerPort())).setMessageFactory(GameMessageFactory.getInstance()).setMessageCodec(new StructMessageCodec()).setSocketIoDispatcher(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)).build().start();
好了,开始你的网络通信项目吧。