实现netty-socketio集群的方式
代码实例
@PostConstructpublic void subscribe() {pubSubStore.subscribe(PubSubType.DISPATCH, new PubSubListener<DispatchMessage>() {@Overridepublic void onMessage(DispatchMessage message) {log.debug("subscribe: {}" ,message);Collection<SocketIOClient> clients = null;String room = message.getRoom();String namespace = message.getNamespace();Packet packet = message.getPacket();if (!ObjectUtils.isEmpty(namespace)&&!ObjectUtils.isEmpty(room)){SocketIONamespace socketIONamespace = socketIOServer.getNamespace(namespace);if (socketIONamespace != null){clients = socketIONamespace.getRoomOperations(room).getClients();}}else{clients = socketIOServer.getBroadcastOperations().getClients();}if(!CollectionUtils.isEmpty(clients)){clients.parallelStream().forEach(ioClient -> {ioClient.sendEvent(Event.Event,packet.getData());});}}}, DispatchMessage.class);}
}
@Component
@Slf4j
public class ClientUtil {@Resourceprivate SocketIOServer socketIOServer;public void sendEvent(SocketIOClient client, MessageModel model) {log.debug("sendEvent--------------------------:{}", model);client.sendEvent(Event.Event, model);}public void sendNamespaceRoomEventExcluded(SocketIOClient client, MessageModel model) {log.debug("sendNamespaceRoomEventExcluded:{}", model);socketIOServer.getNamespace(model.getNamespace()).getRoomOperations(model.getRoomId()).sendEvent(Event.Event, client, model);}public void sendNamespaceRoomEvent(MessageModel model) {log.debug("sendNamespaceRoomEvent:{}", model);socketIOServer.getNamespace(model.getNamespace()).getRoomOperations(model.getRoomId()).sendEvent(Event.Event, model);}public void sendNamespaceEvent(MessageModel model) {log.debug("sendNamespaceRoomEvent:{}", model);socketIOServer.getNamespace(model.getNamespace()).getAllClients().parallelStream().forEach(client -> {client.sendEvent(Event.Event, model);});}public void sendRoomEvent(MessageModel model) {log.debug("sendNamespaceRoomEvent:{}", model);socketIOServer.getRoomOperations(model.getRoomId()).sendEvent(Event.Event, model);}
}
源码
查看源码可发现sendEvent方法最终还是调用了dispatch方法,由此可见我们只需要实现订阅就行。
/*** broadcast interface**/
public interface BroadcastOperations extends ClientOperations {Collection<SocketIOClient> getClients();<T> void send(Packet packet, BroadcastAckCallback<T> ackCallback);void sendEvent(String name, SocketIOClient excludedClient, Object... data);void sendEvent(String name, Predicate<SocketIOClient> excludePredicate, Object... data);<T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback);<T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback);<T> void sendEvent(String name, Object data, Predicate<SocketIOClient> excludePredicate, BroadcastAckCallback<T> ackCallback);}
@Overridepublic void sendEvent(String name, Predicate<SocketIOClient> excludePredicate, Object... data) {Packet packet = new Packet(PacketType.MESSAGE, EngineIOVersion.UNKNOWN);packet.setSubType(PacketType.EVENT);packet.setName(name);packet.setData(Arrays.asList(data));for (SocketIOClient client : clients) {packet.setEngineIOVersion(client.getEngineIOVersion());if (excludePredicate.test(client)) {continue;}client.send(packet);}dispatch(packet);}
private void dispatch(Packet packet) {this.storeFactory.pubSubStore().publish(PubSubType.DISPATCH,new DispatchMessage(this.room, packet, this.namespace));}