消息队列服务器核心功能就是,提供了虚拟主机,交换机, 队列,消息等概念的管理,实现三种典型的消息转发方式,可以实现跨主机/服务器之间的生产者消费模型。
这里,就编写一个demo,实现跨主机的生产者消费者模型。
🍅 完善服务器的启动类
@SpringBootApplication
public class TigerMqApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(TigerMqApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}
}
🍅 创建Demo程序
/*
* 表示一个生产者
* */
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null );channel.queueDeclare("testQueue",true,false,false,null);// 创建一个消息并且发送byte[] body = "hello,TigerMQ".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投递完成!ok = " + ok);Thread.sleep(500);channel.close();connection.close();}
}
/*
* 表示一个消费者
* */
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据]开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0, body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据]结束!");}});// 一直等待消费while (true){Thread.sleep(500);}}
}
🍅 启动服务器和客户端程序
启动服务器
启动生产者和消费者
启动生产者
[Connection] 发送请求! type=1, length=188
[Connection] 收到响应! type=1, length=192
[Connection] 发送请求! type=3, length=512
[Connection] 收到响应! type=3, length=192
[Connection] 发送请求! type=5, length=349
[Connection] 收到响应! type=5, length=192
[Connection] 发送请求! type=9, length=437
[Connection] 收到响应! type=9, length=192
消息投递完成!ok = true
[Connection] 发送请求! type=2, length=188
[Connection] 收到响应! type=2, length=192
[Connection] 连接正常断开!Process finished with exit code 0
启动消费者!
[Connection] 发送请求! type=1, length=188
[Connection] 收到响应! type=1, length=192
[Connection] 发送请求! type=3, length=512
[Connection] 收到响应! type=3, length=192
[Connection] 发送请求! type=5, length=349
[Connection] 收到响应! type=5, length=192
[Connection] 发送请求! type=10, length=315
[Connection] 收到响应! type=10, length=192
[Connection] 收到响应! type=12, length=528
[消费数据]开始!
consumerTag = C-4e9d5324-c197-462a-a0a5-31ffe3bf929a
basicProperties = BasicProperties(messageId=M-69e805c0-8298-4e8f-b737-001c340e18d5, routingKey=testQueue, deliverMode=1)
body = hello,TigerMQ
[消费数据]结束!