文章目录 模拟实现 启动结果如下: ![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/71841546ad8043f1bd51e4408df791de.png)![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/f6e3e72ff9a4483c978ec48e24f075c2.png)
模拟实现
模拟消费者
package com. example. demo. demo ; import com. example. demo. common. Consumer ;
import com. example. demo. common. MqException ;
import com. example. demo. mqclient. Channel ;
import com. example. demo. mqclient. Connection ;
import com. example. demo. mqclient. ConnectionFactory ;
import com. example. demo. mqsever. core. BasicProperties ;
import com. example. demo. mqsever. core. ExchangeType ; import java. io. IOException ;
public class DemoConsumer { public static void main ( String [ ] args) throws IOException , MqException , InterruptedException { System . out. println ( "启动消费者!" ) ; ConnectionFactory factory = new ConnectionFactory ( ) ; factory. setHost ( "127.0.0.1" ) ; factory. setPort ( 9090 ) ; Connection connection = factory. newConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "testExchange" , ExchangeType . DIRECT , true , false , null ) ; channel. queueDeclare ( "testQueue" , true , false , false , null ) ; channel. basicConsume ( "testQueue" , true , new Consumer ( ) { @Override public void handleDelivery ( String consumerTag, BasicProperties basicProperties, byte [ ] body) throws MqException , IOException { System . out. println ( "[消费数据] 开始!" ) ; System . out. println ( "consumerTag=" + consumerTag) ; System . out. println ( "basicProperties=" + basicProperties) ; String bodyString = new String ( body, 0 , body. length) ; System . out. println ( "body=" + bodyString) ; System . out. println ( "[消费数据] 结束!" ) ; } } ) ; while ( true ) { Thread . sleep ( 500 ) ; } }
}
模拟生产者
package com. example. demo. demo ; import com. example. demo. mqclient. Channel ;
import com. example. demo. mqclient. Connection ;
import com. example. demo. mqclient. ConnectionFactory ;
import com. example. demo. mqsever. core. ExchangeType ; import java. io. IOException ;
public class DemoProducer { public static void main ( String [ ] args) throws IOException , InterruptedException { System . out. println ( "启动生产者" ) ; ConnectionFactory factory = new ConnectionFactory ( ) ; factory. setHost ( "127.0.0.1" ) ; factory. setPort ( 9090 ) ; Connection connection = factory. newConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "testExchange" , ExchangeType . DIRECT , true , false , null ) ; channel. queueDeclare ( "testQueue" , true , false , false , null ) ; byte [ ] body = "hello" . getBytes ( ) ; boolean ok = channel. basicPublish ( "testExchange" , "testQueue" , null , body) ; System . out. println ( "消息投递完成! ok=" + ok) ; Thread . sleep ( 500 ) ; channel. close ( ) ; connection. close ( ) ; }
}
效果展示
启动结果如下: