项目实战 — 消息队列(8){网络通信设计②}

目录

 一、客户端设计

🍅 1、设计三个核心类

 🍅 2、完善Connection类

        🎄 读取请求和响应、创建channel

         🎄 添加扫描线程

        🎄 处理不同的响应

        🎄 关闭连接

🍅 3、完善Channel类

🎄 编写createChannel()

🎄 编写waitResult()和putRetuens()方法

🎄 编写其他核心API

        🎊 交换机

        🎊 队列

         🎊 绑定

         🎊发布消息

        🎊 订阅消息

        🎊 确认消息

二、客户端测试

🍅 1、准备工作和收尾工作

 🍅 2、测试connection

🍅 3、测试channnel的创建

🍅 4、测试交换机

 🍅 5、测试队列

🍅 6、测试绑定


 一、客户端设计

🍅 1、设计三个核心类

三个核心类:

(1)ConnectoonFactory连接工厂:这个类,持有服务器的地址,主要功能是创建出连接Connectiond对象

(2)Connection:表示一个TCP连接,持有Socket对象,写入请求/读取响应,管理多个Channel对象

(3)Channel:表示一个逻辑上的连接。当前设定的交互模型,一个TCP连接是可以进行复用的,一个客户端可以有多个模块,每个模块都可以和brokerServer之间建立“逻辑上的连接”(channel),但是这几个模块的channel之间是互相不影响的。同时还需要提供一系列的方法,与服务器提供的核心API进行对应。

 先创建这三个核心的类

在包mqclient中创建这三个类。

@Data
public class ConnectionFactory {
//    BrokerServer的ip地址private String host;
//    BrokerServer端口号private int port;public Connection newConnection(){Connection connection = new Connection(host,port);return connection;}
}
@Data
public class Connection {private Socket socket = null;//    需要管理多个channel,使用哈希表把若干个channel组织起来private ConcurrentHashMap<String ,Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;private ExecutorService callbackPool = null;public Connection(String host,int port) throws IOException {socket = new Socket(host,port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);}//    使用该方法分别处理,当前响应是一个针对控制请求的响应,还是服务器推送的响应private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {//TODO}//    发送请求public void writeRequest(Request request) throws IOException {
//        TODO}//    读取响应public Response readResponse() throws IOException {
//        TODO}//    通过这个方法,再connection中创建出一个channelpublic Channel createChannel(){// TODOreturn channel;}
//    关闭connectionpublic void close() {//TODO}
}
@Data
public class Channel {private String channelId;
//    当前channel属于哪个连接private Connection connection;
//    用来存储后续客户端收到的服务器的响应private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
//    如果当前的Channel订阅了某个队列,此处就需要记录对应的回调是什么
//    当该队列的消息返回回来的时候,就调用回调
//    约定一个channel值能有一个回调private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}//    这个方法主要和服务器进行交互,
//    目的是为了告知服务器,此处客户端创建了新的channelpublic boolean createChannel() {
//        TODOreturn true;}//    使用该方法阻塞等待服务器的响应private BasicReturns waitResult(String rid) {return null;}//    关闭channel,给服务器发送一个type == 0x2的请求public boolean close() throws IOException {//TODOreturn null;}//创建核心的API方法
}


 🍅 2、完善Connection类

这里完成发送请求、读取响应、创建channel、处理响应、关闭连接

        🎄 读取请求和响应、创建channel

//    发送请求public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());}//    读取响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());return response;}//    通过这个方法,再connection中创建出一个channel
//    此处的createChannel()方法在后面channel类中编写以后,会抛异常,这里大家写完channel之后回过来手动抛一下public Channel createChannel() {    //throws IOExceptionString channelId = "C-" + UUID.randomUUID();Channel channel = new Channel(channelId,this);
//        把channel对象放到Connection管理的channel的哈希表中channelMap.put(channelId,channel);
//        同时也需要把“创建channel”的这个消息也告诉服务器boolean ok = channel.createChannel();if (!ok){
//            创建channel失败
//            删除hash表中的键值对channelMap.remove(channelId);return null;}return channel;}

         🎄 添加扫描线程

在构造方法中,添加一个扫描线程,使用该线程,不停的从socket中读取响应,再将这个响应交给对应的channnel。

   public Connection(String host,int port) throws IOException {socket = new Socket(host,port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//        创建一个扫描线程
//        该线程负责不停的从socket中读取响应数据,然后把这个响应数据再交给对应的channel负责处理Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println("[Connection] 连接正常断开!");} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 连接异常断开!");e.printStackTrace();}});t.start();}

        🎄 处理不同的响应

 使用该方法分别处理两种不同的响应,当前响应是一个针对控制请求的响应,还是服务器推送的响应。

private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {
//            服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//            根据channelId找到对用的channel对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId =" + channel.getChannelId());}callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {
//            当前响应是针对控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
//            把这个结果放到对应的channel的hash表中Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId =" + channel.getChannelId());}channel.putReturns(basicReturns);}}

        🎄 关闭连接

public void close() {//        关闭connection,释放持有的资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

🍅 3、完善Channel类

🎄 编写createChannel()

//    这个方法主要和服务器进行交互,
//    目的是为了告知服务器,此处客户端创建了新的channel

//    这个方法主要和服务器进行交互,
//    目的是为了告知服务器,此处客户端创建了新的channelpublic boolean createChannel() throws IOException {
//        对于创建channel来说,payload就是一个basicArgumentsBasicArguments basicArguments=  new BasicArguments();basicArguments.setChannelId(channelId);
//        rid表示这次请求的idbasicArguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);//       发送构造出的请求connection.writeRequest(request);
//        等待服务器的响应BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}private String generateRid(){return "R-" + UUID.randomUUID().toString();}

🎄 编写waitResult()和putRetuens()方法

putRetuents()是为了将返回的响应放到对用的哈希表中

    public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this){
//            当前不知道有多少个线程在等待上面的响应
//            把多有的等待的线程都唤醒notifyAll();}}

waitResult()方法的作用是为了阻塞等待服务器的响应。以下举例说明:  

  如下,假如有3个channel,按照123的顺序发送了请求,所以应该是请求1先等待响应1,然后再是2和3。

但是,现在有一个情况,服务器这边是多线程并发处理请求,服务器处理每个请求的时间不一样,返回响应的顺序也就不一样。

如下图,channel1等待的是响应1,但是先返回的响应却是2和3。响应1还没来,请求1就一直等。请求1,没等到,后面的2和3也就拿不到响应

所以,为了解决这个问题,就创建了basicReturnsMap,将socket中收到的所有响应数据放到这个在前面创建的basicReturnsMap哈希表中。客户端的的请求,就可以不断的从这个哈希表中寻找是否存在和自己匹配的响应。

如果存在,就把相应取走;不存在,就继续等待。

这个waitResult()方法就是,当请求对应的响应不再哈希表中时,就阻塞等待。

//    使用该方法阻塞等待服务器的响应private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null){
//            如果查询结果为null,说明响应没来
//            此时就需要阻塞等待
//            此处加锁是为了保证wait/notify的是同一个对象synchronized (this){try {wait();}catch (InterruptedException e){e.printStackTrace();}}}
//        读取成功之后,把这个响应从哈希表中删除掉basicReturnsMap.remove(rid);return basicReturns;}


🎄 编写其他核心API

        🎊 交换机

//    创建交换机public  boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}//    删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

        🎊 队列

 public boolean queueDeclare(String queueName,boolean durable) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();queueDeleteArguments.setRid(generateRid());queueDeleteArguments.setChannelId(channelId);queueDeleteArguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(queueDeleteArguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());return basicReturns.isOk();}

         🎊 绑定

//    创建绑定public  boolean queueBind(String  queueName,String exchangeName,String bindingKey) throws IOException {QueueBindArguments queueBindArguments = new QueueBindArguments();queueBindArguments.setRid(generateRid());queueBindArguments.setChannelId(channelId);queueBindArguments.setExchangeName(exchangeName);queueBindArguments.setQueueName(queueName);queueBindArguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(queueBindArguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueBindArguments.getRid());return basicReturns.isOk();}//    删除绑定public boolean queueUnbind(String queueName,String exchangeName) throws IOException {QueueUnbindArguments queueUnbindArguments = new QueueUnbindArguments();queueUnbindArguments.setRid(generateRid());queueUnbindArguments.setChannelId(channelId);queueUnbindArguments.setQueueName(queueName);queueUnbindArguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(queueUnbindArguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueUnbindArguments.getRid());return basicReturns.isOk();}

         🎊发布消息

//  发布消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments = new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setRoutingKey(routingKey);arguments.setBasicProperties(basicProperties);arguments.setBody(body);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

        🎊 订阅消息

//    订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
//        先设置回调.if (this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");}this.consumer = consumer;BasicConsumeArguments arguments = new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);
//        此处 consumerTag 使用 channelId 来表示arguments.setConsumerTag(channelId);   arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

        🎊 确认消息

//    确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments = new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

二、客户端测试

🍅 1、准备工作和收尾工作

private BrokerServer brokerServer = null;private ConnectionFactory factory = null;private Thread t = null;@BeforeEachpublic void setUp() throws IOException {// 1. 先启动服务器TigerMqApplication.context = SpringApplication.run(TigerMqApplication.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {// 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!try {brokerServer.start();} catch (IOException e) {e.printStackTrace();}});t.start();// 2. 配置 ConnectionFactoryfactory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException {// 停止服务器brokerServer.stop();// t.join();TigerMqApplication.context.close();// 删除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);factory = null;}
}

 🍅 2、测试connection

@Testpublic void testConnection() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);}
打印日志:
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[BrokerServer] 服务器停止运行!

🍅 3、测试channnel的创建

 @Testpublic void testChannel() throws IOException{Connection connection = connectionFactory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}

🍅 4、测试交换机

@Testpublic void testExchange() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求!type = 1,length = 188
[Request] rid=R-ff8a7be0-8138-4334-b496-647b472349fa, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=1, length=188
[BrokerServer] 创建 channel 完成! channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f
[Response] rid=R-ff8a7be0-8138-4334-b496-647b472349fa, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=1, length=192
[Connection]收到响应!type = 1,length = 192
[connection]发送请求!type = 3,length = 412
[Request] rid=R-9c7ddbe5-e0c1-42b2-a7b3-e456c5c8e457, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=3, length=412
[MemoryDataCenter]新交换机添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交换机创建完成!exchangeName = defaulttestExchange
[Response] rid=R-9c7ddbe5-e0c1-42b2-a7b3-e456c5c8e457, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=3, length=192
[Connection]收到响应!type = 3,length = 192
[connection]发送请求!type = 4,length = 288
[Request] rid=R-76865d42-f11f-4a57-9e99-d66f5bb6d228, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=4, length=288
[MemoryDataCenter]交换机删除成功! exchangeName = defaulttestExchange
[VirtualHost] 交换机删除成功!exchangeName = defaulttestExchange
[Response] rid=R-76865d42-f11f-4a57-9e99-d66f5bb6d228, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=4, length=192
[Connection]收到响应!type = 4,length = 192
[connection]发送请求!type = 2,length = 188
[Request] rid=R-49132d69-cc39-4030-9293-c9c4d3c3f0d1, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=2, length=188
[BrokerServer] 销毁 channel 完成! channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f
[Response] rid=R-49132d69-cc39-4030-9293-c9c4d3c3f0d1, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=2, length=192
[Connection]收到响应!type = 2,length = 192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:54675
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服务器停止运行!
2023-08-13 17:10:28.619  INFO 38940 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:10:28.704  INFO 38940 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:10:28.727  INFO 38940 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.Process finished with exit code 0

 🍅 5、测试队列

@Testpublic void testQueue() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue", true);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求!type = 1,length = 188
[Request] rid=R-86546d10-c911-4cbc-be4c-ac2c9d04ff38, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=1, length=188
[BrokerServer] 创建 channel 完成! channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40
[Response] rid=R-86546d10-c911-4cbc-be4c-ac2c9d04ff38, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=1, length=192
[Connection]收到响应!type = 1,length = 192
[connection]发送请求!type = 5,length = 349
[Request] rid=R-2b3df748-be8a-4aab-9dc7-8c37abf93b91, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=5, length=349
[MemoryDataCenter]队列删除成功!queueName = defaulttestQueue
[VirtualHost]队列创建成功!queueName = defaulttestQueue
[Response] rid=R-2b3df748-be8a-4aab-9dc7-8c37abf93b91, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=5, length=192
[Connection]收到响应!type = 5,length = 192
[connection]发送请求!type = 6,length = 279
[Request] rid=R-057a79a8-20db-408f-82db-eb9e9145a48b, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=6, length=279
[MemoryDataCenter]删除队列成功!queueName = defaulttestQueue
[VirtualHost]删除队列成功!queueName = defaulttestQueue
[Response] rid=R-057a79a8-20db-408f-82db-eb9e9145a48b, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=6, length=192
[Connection]收到响应!type = 6,length = 192
[connection]发送请求!type = 2,length = 188
[Request] rid=R-73a48380-78e9-448f-9566-4cdc7da17bda, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=2, length=188
[BrokerServer] 销毁 channel 完成! channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40
[Response] rid=R-73a48380-78e9-448f-9566-4cdc7da17bda, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=2, length=192
[Connection]收到响应!type = 2,length = 192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:54945
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服务器停止运行!
2023-08-13 17:16:54.304  INFO 72124 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:16:54.333  INFO 72124 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:16:54.363  INFO 72124 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.Process finished with exit code 0

🍅 6、测试绑定

 @Testpublic void testBinding() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求!type = 1,length = 188
[Request] rid=R-071477e3-7115-42e7-9370-7995fa36daab, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=1, length=188
[BrokerServer] 创建 channel 完成! channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52
[Response] rid=R-071477e3-7115-42e7-9370-7995fa36daab, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=1, length=192
[Connection]收到响应!type = 1,length = 192
[connection]发送请求!type = 3,length = 412
[Request] rid=R-cc632d4d-f06f-4b70-88dd-63dd94c666f2, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=3, length=412
[MemoryDataCenter]新交换机添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交换机创建完成!exchangeName = defaulttestExchange
[Response] rid=R-cc632d4d-f06f-4b70-88dd-63dd94c666f2, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=3, length=192
[Connection]收到响应!type = 3,length = 192
[connection]发送请求!type = 5,length = 349
[Request] rid=R-478d13e5-b999-4ac4-96cf-e0c8df829152, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=5, length=349
[MemoryDataCenter]队列删除成功!queueName = defaulttestQueue
[VirtualHost]队列创建成功!queueName = defaulttestQueue
[Response] rid=R-478d13e5-b999-4ac4-96cf-e0c8df829152, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=5, length=192
[Connection]收到响应!type = 5,length = 192
[connection]发送请求!type = 7,length = 347
[Request] rid=R-77076a0e-faa7-4ecb-867b-48c7e33d720d, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=7, length=347
[MemoryDataCenter]新绑定添加成功!exchangeName = defaulttestQueue,queueName = defaulttestQueue
[VirtualHost]绑定创建成功! exchangeName = defaulttestExchangequeueName = defaulttestQueue
[Response] rid=R-77076a0e-faa7-4ecb-867b-48c7e33d720d, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=7, length=192
[Connection]收到响应!type = 7,length = 192
[connection]发送请求!type = 8,length = 314
[Request] rid=R-f5eeb7c0-0e4c-4339-8e3b-057354e27380, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=8, length=314
[MemoryDataCenter]绑定删除成功!exchangeName = defaulttestQueue,queueName = defaulttestQueue
[VirtualHost]删除绑定成功
[Response] rid=R-f5eeb7c0-0e4c-4339-8e3b-057354e27380, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=8, length=192
[Connection]收到响应!type = 8,length = 192
[Request] rid=R-c3335cd6-d02e-440e-9aea-5275bf445412, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=2, length=188
[BrokerServer] 销毁 channel 完成! channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52
[Response] rid=R-c3335cd6-d02e-440e-9aea-5275bf445412, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=2, length=192
[Connection]收到响应!type = 2,length = 192
[connection]发送请求!type = 2,length = 188
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:55256
[BrokerServer]清理session完成~ 被清理的channeId = []
[Connection] 连接正常断开!
[BrokerServer] 服务器停止运行!
2023-08-13 17:22:34.611  INFO 74604 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:22:34.646  INFO 74604 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:22:34.691  INFO 74604 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.Process finished with exit code 0

🍅 7、测试消息的相关操作

 @Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true, false);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);Assertions.assertTrue(ok);ok = channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);Assertions.assertArrayEquals(requestBody, body);System.out.println("[消费数据] 结束!");}});Assertions.assertTrue(ok);Thread.sleep(500);channel.close();connection.close();}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[Connection] 发送请求! type=1, length=188
[Request] rid=R-143ae2ea-f258-4874-bff4-be3f719d44ed, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=1, length=188
[BrokerServer] 创建 channel 完成! channelId=C-0977501d-4608-4428-ae5c-db2738c02068
[Response] rid=R-143ae2ea-f258-4874-bff4-be3f719d44ed, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=1, length=192
[Connection] 收到响应! type=1, length=192
[Connection] 发送请求! type=3, length=512
[Request] rid=R-f4acb000-ae7f-44b5-829d-aef69d8a7394, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=3, length=512
[MemoryDataCenter]新交换机添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交换机创建完成!exchangeName = defaulttestExchange
[Response] rid=R-f4acb000-ae7f-44b5-829d-aef69d8a7394, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=3, length=192
[Connection] 收到响应! type=3, length=192
[Connection] 发送请求! type=5, length=349
[Request] rid=R-311dd85c-c95b-436e-8d4d-85a3c38b445e, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=5, length=349
[MemoryDataCenter]队列删除成功!queueName = defaulttestQueue
[VirtualHost]队列创建成功!queueName = defaulttestQueue
[Response] rid=R-311dd85c-c95b-436e-8d4d-85a3c38b445e, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=5, length=192
[Connection] 收到响应! type=5, length=192
[Connection] 发送请求! type=9, length=429
[Request] rid=R-b1088753-1ee7-43ff-ae1f-85679ad4e48d, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=9, length=429
[MemoryDataCenter]新消息添加成功!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[MemoryDataCenter]消息被投递到到队列中! messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[Response] rid=R-b1088753-1ee7-43ff-ae1f-85679ad4e48d, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=9, length=192
[Connection] 收到响应! type=9, length=192
[Connection] 发送请求! type=10, length=315
[Request] rid=R-bf16b934-229a-4b03-8d8c-50416fec0aa6, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=10, length=315
[MemoryDataCenter]消息从队列中取出!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[VirtualHost]basicConsume成功! queueName = defaulttestQueue
[Response] rid=R-bf16b934-229a-4b03-8d8c-50416fec0aa6, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=10, length=192
[Connection] 收到响应! type=10, length=192
[MemoryDataCenter]消息进入待确认队列!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[Connection] 收到响应! type=12, length=520
[MemoryDataCenter]消息从待确认队列删除!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[MemoryDataCenter]消息被移除!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[ConsumerManager]消费被成功消费!queueName = defaulttestQueue
[消费数据] 开始!
consumerTag=C-0977501d-4608-4428-ae5c-db2738c02068
basicProperties=BasicProperties(messageId=M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b, routingKey=testQueue, deliverMode=1)
[消费数据] 结束!
[Connection] 发送请求! type=2, length=188
[Request] rid=R-024d2c5b-dfd0-4944-983b-2cfab56350d4, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=2, length=188
[BrokerServer] 销毁 channel 完成! channelId=C-0977501d-4608-4428-ae5c-db2738c02068
[Response] rid=R-024d2c5b-dfd0-4944-983b-2cfab56350d4, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=2, length=192
[Connection] 收到响应! type=2, length=192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:58925
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服务器停止运行!
2023-08-13 18:19:24.906  INFO 75700 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 18:19:24.929  INFO 75700 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 18:19:24.935  INFO 75700 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.Process finished with exit code 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/35510.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

广州华锐互动:VR3D课程在线教育平台为职业院校提供沉浸式的虚拟现实学习体验

随着科技的飞速发展&#xff0c;虚拟现实(VR)和增强现实(AR)技术已经逐渐渗透到我们生活的各个领域。其中&#xff0c;VR3D课程在线教育平台作为一种新兴的教育方式&#xff0c;正在逐渐改变我们的学习方式和体验。本文将详细介绍VR3D课程在线教育平台的应用前景及特点。 VR3D课…

【设计模式】工厂模式

工厂模式 工厂模式&#xff08;Factory Pattern&#xff09;是 Java 中最常用的设计模式之一。这种类型的设计模式属于创建型模式&#xff0c;它提供了一种创建对象的最佳方式。 工厂模式提供了一种将对象的实例化过程封装在工厂类中的方式。通过使用工厂模式&#xff0c;可以…

uniapp软键盘谈起遮住输入框和头部被顶起的问题解决

推荐&#xff1a; pages.json中配置如下可解决头部被顶起和表单被遮住的问题。 { "path": "pages/debug/protocol/tagWord", "style": { "app-plus": { "soft…

Java实战:高效提取PDF文件指定坐标的文本内容

前言 临时接到一个紧急需要处理的事项。业务侧一个同事有几千个PDF文件需要整理&#xff1a;需要从文件中的指定位置获取对应的编号和地址。 要的急&#xff0c;工作量大。所以就问到技术部有没有好的解决方案。 问技术的话就只能写个demo跑下了。 解决办法 1. 研究下PDF文档…

案例15 Spring Boot入门案例

1. 选择Spring Initializr快速构建项目 ​ 2. 设置项目信息 ​ 3. 选择依赖 ​ 4. 设置项目名称 ​ 5. 项目结构 ​ 6. 项目依赖 自动配置了Spring MVC、内置了Tomcat、配置了Logback(日志)、配置了JSON。 ​ 7. 创建HelloController类 com.wfit.boot.hello目录下创建HelloCo…

开发过程中遇到的问题以及解决方法

巩固基础&#xff0c;砥砺前行 。 只有不断重复&#xff0c;才能做到超越自己。 能坚持把简单的事情做到极致&#xff0c;也是不容易的。 开发过程中遇到的问题以及解决方法 简单易用的git命令 git命令&#xff1a; 查看有几个分支&#xff1a;git branch -a 切换分支&#…

Azure创建第一个虚拟机

首先&#xff0c;登录到 Azure 门户 (https://portal.azure.com/)。在 Azure 门户右上角&#xff0c;点击“虚拟机”按钮&#xff0c;并点击创建&#xff0c;创建Azure虚拟机。 在虚拟机创建页面中&#xff0c;选择所需的基本配置&#xff0c;包括虚拟机名称、操作系统类型和版…

【JVM】JVM 调优的参数都有哪些?

文章目录 1. 设置堆空间大小2. 虚拟机栈的设置3. 年轻代中Eden区和两个Survivor区的大小比例4. 年轻代晋升老年代阈值5. 设置垃圾回收收集器 1. 设置堆空间大小 设置堆的初始大小和最大大小&#xff0c;为了防止垃圾收集器在初始大小、最大大小之间收缩堆而产生额外的时间&…

python编程小游戏简单的,python小游戏编程100例

大家好&#xff0c;给大家分享一下python编程小游戏简单的&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 不会python就不能用python开发入门级的小游戏&#xff1f; 当然不是&#xff0c;我收集了十个python入门小游戏的源码和教程&#…

分支语句和循环语句(1)

这篇文章我们详细的把分支语句和循环语句给大家进行讲解。 分支语句&#xff1a; if switch 循环语句&#xff1a; while for do while goto语句&#xff1a; 1.什么是语句&#xff1f; C语句可分为以下五类&#xff1a; 1. 表达式语句 2. 函数调用语句 3. 控制…

ORCA优化器浅析——CDXLOperator Base class for operators in a DXL tree

如上图所示&#xff0c;CDXLOperator作为Base class for operators in a DXL tree&#xff0c;其子类CDXLLogical、CDXLScalar、CDXLPhysical作为逻辑节点、物理节点和Scalar节点的DXL表示类&#xff0c;因此其包含了这些类的共同部分特性&#xff0c;比如获取其DXL节点表示的函…

Qt 文件对话框使用 Deepin风格

当你在Deepin或UOS 上开发 Qt 程序时&#xff0c;如果涉及到文件对话框功能&#xff0c;那么就会遇到调用原生窗口的问题。 如果你使用的是官方的Qt版本&#xff0c;那么在Deepin或者UOS系统上&#xff0c;弹出的文件对话框会是如下这样&#xff1a; 而Deepin或UOS系统提供的默…

可视化高级绘图技巧100篇-总论

前言 优秀的数据可视化作品可以用三个关键词概括&#xff1a;准确、清晰、优雅。 准确&#xff1a;精准地反馈数据的特征信息&#xff08;既不遗漏也不冗余&#xff0c;不造成读者疏漏&误读细节&#xff09; 清晰&#xff1a;获取图表特征信息的时间越短越好 优雅&…

Gitlab CI/CD笔记-第二天-主机套接字进行构建并push镜像。

一、安装gitlab-runner 1.可以是linux也可以是docker的 2.本文说的是docker安装部署的。 二、直接上.gitlab-ci.yml stages: # List of stages for jobs, and their order of execution - build-image build-image-job: stage: build-image image: harbor.com:543/docke…

企业计算机服务器中了360后缀勒索病毒怎么办,勒索病毒解密数据恢复

随着计算机技术的不断发展&#xff0c;企业的办公系统得到了很大提升&#xff0c;但是随之而来的网络安全威胁也不断增加&#xff0c;勒索病毒的攻击事件时有发生。近期&#xff0c;我们收到某地连锁超市的求助&#xff0c;企业的计算机服务器遭到了360后缀勒索病毒攻击&#x…

html练习

html练习 工具代码运行结果 工具 HBuilder X 代码 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>图灵之家</title></head><body><h1>图灵之家</h1><br><br><h2>我的…

使用几何和线性代数从单个图像进行 3D 重建

使用几何和线性代数从单个图像进行 3D 重建 萨蒂亚 一、说明 3D重构是一个挑战性题目&#xff0c;而且这个新颖的题目正处于启发和膨胀阶段&#xff1b;因此&#xff0c;各种各样的尝试层出不穷&#xff0c;本篇说明尝试的一种&#xff0c;至于其它更多的尝试&#xff0c;我们在…

RTT(RT-Thread)IO设备模型

目录 IO设备模型 模型框架原理 IO设备类型 创建和注册IO设备 RTT设备管理程序实现原理 访问IO设备 查找设备 初始化设备 打开设备 关闭设备 控制设备 读写设备 数据收发回调 数据接收回调 数据发送回调 设备模型实例 IO设备模型 RT-Thread 提供了一套简单的 I/O …

网络编程(TFTP协议实验)

#include <stdio.h> #include <string.h> #include <stdlib.h> #include <head.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h>#define PORT 69 //端口号&#xf…

网络安全威胁与防御策略

第一章&#xff1a;引言 随着数字化时代的快速发展&#xff0c;网络已经成为人们生活和工作中不可或缺的一部分。然而&#xff0c;网络的广泛应用也引发了一系列严峻的网络安全威胁。恶意软件、网络攻击、数据泄露等问题层出不穷&#xff0c;给个人和企业带来了巨大的风险。本文…