Java项目之消息队列(手写java模拟实现mq)【七、⽹络通信协议设计、消息队列服务器端实现、客户端实现】✔ ★

⼗⼀. ⽹络通信协议设计

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

定义 Request / Response

/** 表示一个网络通信中的请求对象. 按照自定义协议的格式来展开的*/
public class Request {private int type;private int length;private byte[] payload;public int getType() {return type;}public void setType(int type) {this.type = type;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getPayload() {return payload;}public void setPayload(byte[] payload) {this.payload = payload;}
}
/** 这个对象表示一个响应. 也是根据自定义应用层协议来的*/
public class Response {private int type;private int length;private byte[] payload;public int getType() {return type;}public void setType(int type) {this.type = type;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getPayload() {return payload;}public void setPayload(byte[] payload) {this.payload = payload;}
}

定义参数⽗类

构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.
不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再通过继承的⽅式体现

/** 使用这个类表示方法的公共参数/辅助的字段.* 后续每个方法又会有一些不同的参数, 不同的参数再分别使用不同的子类来表示.*/
public class BasicArguments implements Serializable {// 表示一次请求/响应 的身份标识. 可以把请求和响应对上.protected String rid;// 这次通信使用的 channel 的身份标识.protected String channelId;public String getRid() {return rid;}public void setRid(String rid) {this.rid = rid;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}
}

此处的 rid 和 channelId 都是基于 UUID 来⽣成的. rid ⽤来标识⼀个请求-响应. 这⼀点在请求响应⽐较多的时候⾮常重要

定义返回值⽗类

public class BasicAckArguments extends BasicArguments implements Serializable {private String queueName;private String messageId;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}
}

定义其他参数类

针对每个 VirtualHost 提供的⽅法, 都需要有⼀个类表⽰对应的参数

1) ExchangeDeclareArguments

package com.example.mq.common;import com.example.mq.mqserver.core.ExchangeType;import java.io.Serializable;
import java.util.Map;public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private Map<String, Object> arguments;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public ExchangeType getExchangeType() {return exchangeType;}public void setExchangeType(ExchangeType exchangeType) {this.exchangeType = exchangeType;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable = durable;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete = autoDelete;}public Map<String, Object> getArguments() {return arguments;}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}

⼀个创建交换机的请求, 形如:
• 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图⽚的结构.
• 按照 length ⻓度读取出 payload, 就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象

2) ExchangeDeleteArguments

public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}
}

3) QueueDeclareArguments

public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean exclusive;private boolean autoDelete;private Map<String, Object> arguments;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable = durable;}public boolean isExclusive() {return exclusive;}public void setExclusive(boolean exclusive) {this.exclusive = exclusive;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete = autoDelete;}public Map<String, Object> getArguments() {return arguments;}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}

4) QueueDeleteArguments

public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}
}

5) QueueBindArguments

public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public String getBindingKey() {return bindingKey;}public void setBindingKey(String bindingKey) {this.bindingKey = bindingKey;}
}

6) QueueUnbindArguments

public class QueueUnbindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}
}

7) BasicPublishArguments

public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey = routingKey;}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}
}

8) BasicConsumeArguments

public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;// 这个类对应的 basicConsume 方法中, 还有一个参数, 是回调函数. (如何来处理消息)// 这个回调函数, 是不能通过网络传输的.// 站在 broker server 这边, 针对消息的处理回调, 其实是统一的. (把消息返回给客户端)// 客户端这边收到消息之后, 再在客户端自己这边执行一个用户自定义的回调就行了.// 此时, 客户端也就不需要把自身的回调告诉给服务器了.// 这个类就不需要 consumer 成员了.public String getConsumerTag() {return consumerTag;}public void setConsumerTag(String consumerTag) {this.consumerTag = consumerTag;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public boolean isAutoAck() {return autoAck;}public void setAutoAck(boolean autoAck) {this.autoAck = autoAck;}
}

9) SubScribeReturns

public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;public String getConsumerTag() {return consumerTag;}public void setConsumerTag(String consumerTag) {this.consumerTag = consumerTag;}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}
}

⼗⼆. 实现 BrokerServer

创建 BrokerServer 类

• virtualHost 表⽰服务器持有的虚拟主机. 队列, 交换机, 绑定, 消息都是通过虚拟主机管理.
• sessions ⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socket.
• serverSocket 是服务器⾃⾝的 socket
• executorService 这个线程池⽤来处理响应.
• runnable 这个标志位⽤来控制服务器的运⾏停⽌.


/** 这个 BrokerServer 就是咱们 消息队列 本体服务器.* 本质上就是一个 TCP 的服务器.*/
public class BrokerServer {private ServerSocket serverSocket = null;// 当前考虑一个 BrokerServer 上只有一个 虚拟主机private VirtualHost virtualHost = new VirtualHost("default");// 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)// 此处的 key 是 channelId, value 为对应的 Socket 对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();// 引入一个线程池, 来处理多个客户端的请求.private ExecutorService executorService = null;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable = true;public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(port);}public void start() throws IOException {System.out.println("[BrokerServer] 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把处理连接的逻辑丢给这个线程池.executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println("[BrokerServer] 服务器停止运行!");// e.printStackTrace();}}// 一般来说停止服务器, 就是直接 kill 掉对应进程就行了.// 此处还是搞一个单独的停止方法. 主要是用于后续的单元测试.public void stop() throws IOException {runnable = false;// 把线程池中的任务都放弃了. 让线程都销毁.executorService.shutdownNow();serverSocket.close();}// 通过这个方法, 来处理一个客户端的连接.// 在这一个连接中, 可能会涉及到多个请求和响应.private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {// 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {// 1. 读取请求并解析.Request request = readRequest(dataInputStream);// 2. 根据请求计算响应Response response = process(request, clientSocket);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}}} catch (EOFException | SocketException e) {// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.// 需要借助这个异常来结束循环System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[BrokerServer] connection 出现异常!");e.printStackTrace();} finally {try {// 当连接处理完了, 就需要记得关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错!");}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 这个刷新缓冲区也是重要的操作!!dataOutputStream.flush();}private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析.BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() == 0x7) {QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() == 0x8) {QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端.// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的// socket 对象了, 从而可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}// 2. 构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服务器给消费者客户端推送的消息数据.response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端.//    注意! 此处的 dataOutputStream 这个对象不能 close !!!//    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.//    此时就无法继续往 socket 中写入后续数据了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 调用 basicAck 确认消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 当前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}private void clearClosedSession(Socket clientSocket) {// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String, Socket> entry : sessions.entrySet()) {if (entry.getValue() == clientSocket) {// 不能在这里直接删除!!!// 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);}}

启动/停⽌服务器

• 这⾥就是⼀个单纯的 TCP 服务器, 没啥特别的.
• 实现停⽌操作, 主要是为了⽅便后续开展单元测试

    public void start() throws IOException {System.out.println("[BrokerServer] 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把处理连接的逻辑丢给这个线程池.executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println("[BrokerServer] 服务器停止运行!");// e.printStackTrace();}}// 一般来说停止服务器, 就是直接 kill 掉对应进程就行了.// 此处还是搞一个单独的停止方法. 主要是用于后续的单元测试.public void stop() throws IOException {runnable = false;// 把线程池中的任务都放弃了. 让线程都销毁.executorService.shutdownNow();serverSocket.close();}

实现处理连接

• 对于 EOFException 和 SocketException , 我们视为客⼾端正常断开连接.
◦ 如果是客⼾端先 close, 后调⽤ DataInputStream 的 read, 则抛出 EOFException
◦ 如果是先调⽤ DataInputStream 的 read, 后客⼾端调⽤ close, 则抛出 SocketException

    // 通过这个方法, 来处理一个客户端的连接.// 在这一个连接中, 可能会涉及到多个请求和响应.private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {// 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {// 1. 读取请求并解析.Request request = readRequest(dataInputStream);// 2. 根据请求计算响应Response response = process(request, clientSocket);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}}} catch (EOFException | SocketException e) {// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.// 需要借助这个异常来结束循环System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[BrokerServer] connection 出现异常!");e.printStackTrace();} finally {try {// 当连接处理完了, 就需要记得关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}

实现 readRequest

    private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错!");}request.setPayload(payload);return request;}

实现 writeResponse

    private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 这个刷新缓冲区也是重要的操作!!dataOutputStream.flush();}

实现处理请求

• 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
• 再根据不同的 type, 分别处理不同的逻辑. (主要是调⽤ virtualHost 中不同的⽅法).
• 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客⼾端.
• 最后构造成统⼀的响应

    private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析.BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() == 0x7) {QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() == 0x8) {QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端.// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的// socket 对象了, 从而可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}// 2. 构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服务器给消费者客户端推送的消息数据.response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端.//    注意! 此处的 dataOutputStream 这个对象不能 close !!!//    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.//    此时就无法继续往 socket 中写入后续数据了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 调用 basicAck 确认消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 当前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}

实现 clearClosedSession

• 如果客⼾端只关闭了 Connection, 没关闭 Connection 中包含的 Channel, 也没关系, 在这⾥统⼀进⾏清理.
• 注意迭代器失效问题.

    private void clearClosedSession(Socket clientSocket) {// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String, Socket> entry : sessions.entrySet()) {if (entry.getValue() == clientSocket) {// 不能在这里直接删除!!!// 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);}

⼗三. 实现客⼾端

创建 ConnectionFactory

⽤来创建连接的⼯⼚类.
• 当前没有实现⽤⼾认证和多虚拟主机, ⽤⼾名密码可以暂时先不要

package com.example.mq.mqclient;import java.io.IOException;public class ConnectionFactory {// broker server 的 ip 地址private String host;// broker server 的端口号private int port;// 访问 broker server 的哪个虚拟主机.// 下列几个属性暂时先都不搞了.
//    private String virtualHostName;
//    private String username;
//    private String password;public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}
}

Connection 和 Channel 的定义

⼀个客⼾端可以创建多个 Connection.
⼀个 Connection 对应⼀个 socket, ⼀个 TCP 连接.
⼀个 Connection 可以包含多个 Channel

1) Connection 的定义

• Socket 是客⼾端持有的套接字. InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接⼝.
• channelMap ⽤来管理该连接中所有的 Channel.
• callbackPool 是⽤来在客⼾端这边执⾏⽤⼾回调的线程池.

public class Connection {private Socket socket = null;// 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来.private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;private ExecutorService callbackPool = null;public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println("[Connection] 连接正常断开!");} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 连接异常断开!");e.printStackTrace();}});t.start();}public void close() {// 关闭 Connection 释放上述资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}// 使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {// 服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());// 根据 channelId 找到对应的 channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}// 执行该 channel 对象内部的回调.callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 当前响应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());// 把这个结果放到对应的 channel 的 hash 表中.Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}channel.putReturns(basicReturns);}}// 发送请求public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());}// 读取响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());return response;}// 通过这个方法, 在 Connection 中能够创建出一个 Channelpublic Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.channelMap.put(channelId, channel);// 同时也需要把 "创建 channel" 的这个消息也告诉服务器.boolean ok = channel.createChannel();if (!ok) {// 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!// 把刚才已经加入 hash 表的键值对, 再删了.channelMap.remove(channelId);return null;}return channel;}
}

2) Channel 的定义

• channelId 为 channel 的⾝份标识, 使⽤ UUID 标识.
• Connection 为 channel 对应的连接.
• baseReturnsMap ⽤来保存响应的返回值. 放到这个哈希表中⽅便和请求匹配.
• consumer 为消费者的回调(⽤⼾注册的). 对于消息响应, 应该调⽤这个回调处理消息.

package com.example.mq.mqclient;import com.example.mq.common.*;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;
import java.lang.ref.ReferenceQueue;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;public class Channel {private String channelId;// 当前这个 channel 属于哪个连接.private Connection connection;// 用来存储后续客户端收到的服务器的响应.private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.// 此处约定一个 Channel 中只能有一个回调.private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}// 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.public boolean createChannel() throws IOException {// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象BasicArguments basicArguments = new BasicArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 构造出完整请求之后, 就可以发送这个请求了.connection.writeRequest(request);// 等待服务器的响应BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}// 期望使用这个方法来阻塞等待服务器的响应.private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {// 如果查询结果为 null, 说明包裹还没回来.// 此时就需要阻塞等待.synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}// 读取成功之后, 还需要把这个消息从哈希表中删除掉.basicReturnsMap.remove(rid);return basicReturns;}public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 当前也不知道有多少个线程在等待上述的这个响应.// 把所有的等待的线程都唤醒.notifyAll();}}private String generateRid() {return "R-" + UUID.randomUUID().toString();}// 关闭 channel, 给服务器发送一个 type = 0x2 的请求public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}// 创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}// 删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}// 删除队列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments arguments = new QueueDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 创建绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments arguments = new QueueBindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);arguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 解除绑定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnbindArguments arguments = new QueueUnbindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 发送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments = new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setRoutingKey(routingKey);arguments.setBasicProperties(basicProperties);arguments.setBody(body);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {// 先设置回调.if (this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");}this.consumer = consumer;BasicConsumeArguments arguments = new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setConsumerTag(channelId);  // 此处 consumerTag 也使用 channelId 来表示了.arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments = new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}public Connection getConnection() {return connection;}public void setConnection(Connection connection) {this.connection = connection;}public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {return basicReturnsMap;}public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {this.basicReturnsMap = basicReturnsMap;}public Consumer getConsumer() {return consumer;}public void setConsumer(Consumer consumer) {this.consumer = consumer;}}

Connection

封装请求响应读写操作

在 Connection 中, 实现下列⽅法

    // 发送请求public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());}// 读取响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());return response;}

创建 channel

在 Connection 中, 定义下列⽅法来创建⼀个 channel

    // 通过这个方法, 在 Connection 中能够创建出一个 Channelpublic Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.channelMap.put(channelId, channel);// 同时也需要把 "创建 channel" 的这个消息也告诉服务器.boolean ok = channel.createChannel();if (!ok) {// 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!// 把刚才已经加入 hash 表的键值对, 再删了.channelMap.remove(channelId);return null;}return channel;}

Channel

发送请求

通过 Channel 提供请求的发送操作.

1) 创建 channel

    // 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.public boolean createChannel() throws IOException {// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象BasicArguments basicArguments = new BasicArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 构造出完整请求之后, 就可以发送这个请求了.connection.writeRequest(request);// 等待服务器的响应BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}

generateRid 的实现

    private String generateRid() {return "R-" + UUID.randomUUID().toString();}

waitResult 的实现
• 由于服务器的响应是异步的. 此处通过 waitResult 实现同步等待的效果

// 期望使用这个方法来阻塞等待服务器的响应.private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {// 如果查询结果为 null, 说明包裹还没回来.// 此时就需要阻塞等待.synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}// 读取成功之后, 还需要把这个消息从哈希表中删除掉.basicReturnsMap.remove(rid);return basicReturns;}

2) 关闭 channel

    // 关闭 channel, 给服务器发送一个 type = 0x2 的请求public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}

3) 创建交换机

    // 创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}

4) 删除交换机

    // 删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

5) 创建队列

   // 创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}

6) 删除队列

    // 删除队列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments arguments = new QueueDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

7) 创建绑定

    // 创建绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments arguments = new QueueBindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);arguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

8) 删除绑定

    // 解除绑定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnbindArguments arguments = new QueueUnbindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

9) 发送消息

    // 发送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments = new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setRoutingKey(routingKey);arguments.setBasicProperties(basicProperties);arguments.setBody(body);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

10) 订阅消息

// 订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {// 先设置回调.if (this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");}this.consumer = consumer;BasicConsumeArguments arguments = new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setConsumerTag(channelId);  // 此处 consumerTag 也使用 channelId 来表示了.arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

11) 确认消息

    // 确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments = new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

⼩结

上述发送请求的操作, 逻辑基本⼀致. 构造参数 + 构造请求 + 发送 + 等待结果

处理响应

1) 创建扫描线程

创建⼀个扫描线程, ⽤来不停的读取 socket 中的响应数据.
注意: ⼀个 Connection 中可能包含多个 channel, 需要把响应分别放到对应的 channel 中.

    public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println("[Connection] 连接正常断开!");} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 连接异常断开!");e.printStackTrace();}});t.start();}

2) 实现响应的分发

给 Connection 创建 dispatchResponse ⽅法.
• 针对服务器返回的控制响应和消息响应, 分别处理.
◦ 如果是订阅数据, 则调⽤ channel 中的回调.
◦ 如果是控制消息, 直接放到结果集合中.

    // 使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {// 服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());// 根据 channelId 找到对应的 channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}// 执行该 channel 对象内部的回调.callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 当前响应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());// 把这个结果放到对应的 channel 的 hash 表中.Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}channel.putReturns(basicReturns);}}

3) 实现 channel.putReturns

把响应放到响应的 hash 表中, 同时唤醒等待响应的线程去消费.

    public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 当前也不知道有多少个线程在等待上述的这个响应.// 把所有的等待的线程都唤醒.notifyAll();}}

关闭 Connection

给 Connection 实现 close ⽅法

    public void close() {// 关闭 Connection 释放上述资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

测试客⼾端-服务器

public class MqClientTests {private BrokerServer brokerServer = null;private ConnectionFactory factory = null;private Thread t = null;@BeforeEachpublic void setUp() throws IOException {// 1. 先启动服务器MqApplication.context = SpringApplication.run(MqApplication.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {// 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!try {brokerServer.start();} catch (IOException e) {e.printStackTrace();}});t.start();// 2. 配置 ConnectionFactoryfactory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException {// 停止服务器brokerServer.stop();// t.join();MqApplication.context.close();// 删除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);factory = null;}@Testpublic void testConnection() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);}@Testpublic void testChannel() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}@Testpublic void testExchange() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchange");Assertions.assertTrue(ok);// 此处稳妥起见, 把改关闭的要进行关闭.channel.close();connection.close();}@Testpublic void testQueue() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue", true, false, false, null);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testBinding() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true, false, false, null);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true, false, false, null);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);Assertions.assertTrue(ok);ok = channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);Assertions.assertArrayEquals(requestBody, body);System.out.println("[消费数据] 结束!");}});Assertions.assertTrue(ok);Thread.sleep(500);channel.close();connection.close();}
}

⼗四. 案例: 基于 MQ 的⽣产者消费者模型

/** 这个类表示一个消费者.* 通常这个类也应该是在一个独立的服务器中被执行*/
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.while (true) {Thread.sleep(500);}}
}
/** 这个类用来表示一个生产者.* 通常这是一个单独的服务器程序.*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}

扩展功能

• 虚拟主机管理
• ⽤⼾管理/⽤⼾认证
• 交换机/队列 的独占模式和⾃动删除.
• 发送⽅确认(broker 给⽣产者的确认应答)
• 拒绝应答 (nack)
• 死信队列
• 管理接⼝
• 管理⻚⾯

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/853946.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

简单谈谈云服务器私网IP的存在意义及优势

云服务器是基于虚拟化技术的计算资源&#xff0c;可以在云平台上灵活创建和管理。为了满足不同用户的需求&#xff0c;云服务提供商在云服务器上分配了两种类型的IP地址&#xff1a;公网IP和私网IP。其中&#xff0c;私网IP是指在局域网内使用的内部IP地址&#xff0c;无法通过…

机器学习(V)--无监督学习(二)主成分分析

当数据的维度很高时&#xff0c;很多机器学习问题变得相当困难&#xff0c;这种现象被称为维度灾难&#xff08;curse of dimensionality&#xff09;。 在很多实际的问题中&#xff0c;虽然训练数据是高维的&#xff0c;但是与学习任务相关也许仅仅是其中的一个低维子空间&am…

环形链表2证明

解法 快慢指针相遇后&#xff0c;其中一个指回头部&#xff0c;然后同步前进 代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode(int x) : val(x), next(NULL) {}* };*/ class Solution { public:ListNod…

【gitcode】idea 在本地拉取和push本地代码到gitcode仓库

【首次使用】 1、idea 拉取代码&#xff0c;很容易这里就不记录了。 2、push代码时&#xff0c;总是弹窗登录输入在gitcode.com登录能成功。但是在idea 怎么都不成功。控制台提示 remote: HTTP Basic: Access denied fatal: Authentication failed for ******* 认证失败 3…

3D ToF赋能小米CyberDog 2提升视觉灵敏度

随着科技的进步,智能机器人越来越多地融入我们的日常生活。其中,CyberDog 2作为一款前沿的四足机器人,凭借其出色的视觉灵敏度和多功能技术配备,受到了广泛的关注。本文将重点探讨CyberDog 2的视觉系统,尤其是其四种不同类型的摄像头如何共同提升其视觉灵敏度,以及激光传…

uniapp实现路由拦截——遇到问题(三)

uniapp路由拦截开发过程中遇到问题 文章目录 uniapp路由拦截开发过程中遇到问题App 无法退出应用监听返回数据结构解决方式模拟原生物理返回键提示不提示&#xff0c;直接退出应用 微信小程序 登录成功返回页面报错效果图不同平台来源页面数据结构解决方式 App 无法退出应用 安…

HP惠普暗影精灵10 OMEN Gaming Laptop 16-wf1xxx原厂Win11系统镜像下载

惠普hp暗影精灵10笔记本电脑16-wf1000TX原装出厂Windows11&#xff0c;恢复开箱状态oem预装系统安装包&#xff0c;带恢复重置还原 适用型号:16-wf1xxx 16-wf1000TX,16-wf1023TX,16-wf1024TX,16-wf1025TX, 16-wf1026TX,16-wf1027TX,16-wf1028TX,16-wf1029TX, 16-wf1030TX,16-…

Autosar诊断-FIM模块功能介绍

文章目录 前言一、FIM模块概述二、FID概念介绍Event ID和DTC之间的关系Event ID与FID之间的关系FIM数据结构三、FiM模块与SW-C模块交互关系四、FIM模块函数调用关系FiM功能模块作用过程前言 Autosar诊断的主体为UDS(Unified Diagnostic Services)协议,即统一的诊断服务,是…

python实战根据excel的文件名称这一列的内容,找到电脑D盘的下所对应的文件位置,要求用程序实现

今天客户需要 根据excel的文件名称这一列的内容&#xff0c;找到电脑D盘的下所对应的文件位置&#xff0c;要求用程序实现 数据样例&#xff1a;记录.xlsx 解决代码&#xff1a; 1、安装必要的库&#xff1a; pip install pandas openpyxl2、编写Python脚本&#xff1a; im…

Modbus为何要转成ProfiNET

Modbus与ProfiNET代表了工业通讯不同阶段的发展&#xff0c;各自具有优缺点。Modbus简单易用&#xff0c;适合小型系统&#xff1b;ProfiNET高效稳定&#xff0c;适用于大型复杂网络。转换Modbus为ProfiNET可提高系统性能和扩展性。实际场景下&#xff0c;升级生产线控制器为Pr…

分享一个 .NET Core 使用选项方式读取配置内容的详细例子

前言 在 .NET Core 中&#xff0c;可以使用选项模式&#xff08;Options Pattern&#xff09;来读取和管理应用程序的配置内容。 选项模式通过创建一个 POCO&#xff08;Plain Old CLR Object&#xff09;来表示配置选项&#xff0c;并将其注册到依赖注入容器中&#xff0c;方…

弗洛伊德算法——C语言

弗洛伊德算法&#xff0c;是一种用于解决所有顶点对之间最短路径问题的经典算法&#xff0c;该算法通过动态规划的方法计算出从每个顶点到其他所有顶点的最短路径。 弗洛伊德算法的基本思想是逐步考虑每一个顶点作为中间点&#xff0c;更新所有顶点对之间的最短路径。它通过以…

js 前端 Function.prototype.call.call(0[‘toString‘], *, 16)

这个函数将 数组转任意进制 Function.prototype.call.call(0[toString], *, 16)

Oracle--存储结构

总览 一、逻辑存储结构 二、物理存储结构 1.数据文件 2.控制文件 3.日志文件 4.服务器参数文件 5.密码文件 总览 一、逻辑存储结构 数据块是Oracle逻辑存储结构中的最小的逻辑单位&#xff0c;一个数据库块对应一个或者多个物理块&#xff0c;大小由参数DB_BLOCK_SIZE决…

Java语法和基本结构介绍

Java语法和基本结构是Java编程的基础&#xff0c;它决定了Java代码的书写方式和程序的结构。以下是Java语法和基本结构的一些关键点&#xff1a; 1.标识符和关键字&#xff1a;Java中的标识符是用来标识变量、函数、类或其他用户自定义元素的名称。关键字是预留的标识符&#x…

[C++ STL] vector 详解

标题&#xff1a;[C STL] vector 详解 水墨不写bug 目录 一、背景 二、vector简介 三、vector的接口介绍 &#xff08;1&#xff09;默认成员函数接口 i&#xff0c;构造函数&#xff08;constructor&#xff09; ii&#xff0c;析构函数&#xff08;destructor&#xff0…

Verilog综合出来的图

Verilog写代码时需要清楚自己综合出来的是组合逻辑、锁存器还是寄存器。 甚至&#xff0c;有时写的代码有误&#xff0c;vivado不能识别出来&#xff0c;这时打开综合后的schematic简单查看一下是否综合出想要的结果。 比如&#xff1a;误将一个always模块重复一遍&#xff0c;…

天翼云认证专家解决方案架构师(理论)

1.某大型互联网公司为了提升应用程序和基础设施的稳定性&#xff0c;计划引入自动化监控工具。以下哪些工具可以满足公司的需求? A.Grafana B.Nagios C.Prometheus D.Jenkins 2.天翼智能边缘云ECX是位于网络边缘位置的云&#xff0c;兼具云和CDN的特性&#xff0c;将计算、存…

使用百度的长文本转语音API时无法下载.MP3文件

今天是学生们交作业的时候&#xff0c;结果是我最忙碌的一天&#xff0c;各种改bug。 有个学生来问&#xff1a; 我在百度提供的API代码(长文本转语音)的基础上添加了下载生成的.MP3文件的代码&#xff0c;运行之后成功建成了.MP3文件&#xff0c;但是文件的内容确实以下的报错…

如何通过Outlook大附件插件,加强外发附件的安全性和管控力度?

因邮件的便捷性和普遍性&#xff0c;企业间业务往来通常会采取邮箱业务&#xff0c;沟通使用成本也比较低&#xff0c;但容易出现附件太大无法上传的问题。Outlook大附件插件是为解决邮件系统中附件大小限制问题而开发的一系列工具。 使用邮件发送附件时&#xff0c;可能会遇到…