package com.java1234.producer.config; import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { /*** direct交换机名称*/public static final String DIRECT_EXCHANGE = "directExchange" ; /*** direct交换机名称1*/public static final String DIRECT_EXCHANGE1 = "directExchange1" ; /*** fanout交换机名称*/public static final String FANOUT_EXCHANGE = "fanoutExchange" ; /*** direct队列名称*/public static final String DIRECT_QUEUE = "directQueue" ; /*** direct1队列名称*/public static final String DIRECT_QUEUE1 = "directQueue1" ; /*** direct2队列名称*/public static final String DIRECT_QUEUE2 = "directQueue2" ; /*** 订阅队列1名称*/public static final String SUB_QUEUE1 = "subQueue1" ; /*** 订阅队列2名称*/public static final String SUB_QUEUE2 = "subQueue2" ; /*** direct路由Key*/public static final String DIRECT_ROUTINGKEY = "directRoutingKey" ; /*** 定义一个direct交换机* @return*/@Beanpublic DirectExchange directExchange ( ) { return new DirectExchange( DIRECT_EXCHANGE) ; } /*** 定义一个direct交换机1* @return*/@Beanpublic DirectExchange directExchange1 ( ) { return new DirectExchange( DIRECT_EXCHANGE1) ; } /*** 定义一个direct交换机* @return*/@Beanpublic FanoutExchange fanoutExchange ( ) { return new FanoutExchange( FANOUT_EXCHANGE) ; } /*** 定义一个direct队列* @return*/@Beanpublic Queue directQueue ( ) { return new Queue( DIRECT_QUEUE) ; } /*** 定义一个direct1队列* @return*/@Beanpublic Queue directQueue1 ( ) { return new Queue( DIRECT_QUEUE1) ; } /*** 定义一个direct2队列* @return*/@Beanpublic Queue directQueue2 ( ) { return new Queue( DIRECT_QUEUE2) ; } /*** 定义一个订阅队列1* @return*/@Beanpublic Queue subQueue1 ( ) { return new Queue( SUB_QUEUE1) ; } /*** 定义一个订阅队列2* @return*/@Beanpublic Queue subQueue2 ( ) { return new Queue( SUB_QUEUE2) ; } /*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding ( ) { return BindingBuilder.bind( directQueue( )) .to( directExchange( )) .with( DIRECT_ROUTINGKEY) ; } /*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding fanoutBinding1 ( ) { return BindingBuilder.bind( subQueue1( )) .to( fanoutExchange( )) ; } /*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding fanoutBinding2 ( ) { return BindingBuilder.bind( subQueue2( )) .to( fanoutExchange( )) ; } /*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding1 ( ) { return BindingBuilder.bind( directQueue1( )) .to( directExchange1( )) .with( "error" ) ; } /*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding2 ( ) { return BindingBuilder.bind( directQueue2( )) .to( directExchange1( )) .with( "info" ) ; } /*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding3 ( ) { return BindingBuilder.bind( directQueue2( )) .to( directExchange1( )) .with( "error" ) ; } /*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding4 ( ) { return BindingBuilder.bind( directQueue2( )) .to( directExchange1( )) .with( "warning" ) ; } }
package com.java1234.consumer.service.impl; import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; @Service( "rabbitmqService" )
public class RabbitMqServiceImpl implements RabbitMqService { @Autowiredprivate AmqpTemplate amqpTemplate; @Overridepublic void receiveMessage ( ) { String message = ( String) amqpTemplate.receiveAndConvert( RabbitMQConfig.DIRECT_QUEUE) ; System.out.println( "接受到的mq消息:" +message) ; } @Override@RabbitListener( queues = { RabbitMQConfig.DIRECT_QUEUE1} ) public void receiveMessage2( String message) { // System.out.println( "消费者1:接收到的mq消息:" +message) ; System.out.println( "队列1接收日志消息:" +message) ; } @Override@RabbitListener( queues = { RabbitMQConfig.DIRECT_QUEUE2} ) public void receiveMessage3( String message) {
// System.out.println( "消费者2:接收到的mq消息:" +message) ; System.out.println( "队列2接收日志消息:" +message) ; } @Override@RabbitListener( queues = { RabbitMQConfig.SUB_QUEUE1} ) public void receiveSubMessage1( String message) { System.out.println( "订阅者1:接收到的mq消息:" +message) ; } @Override@RabbitListener( queues = { RabbitMQConfig.SUB_QUEUE2} ) public void receiveSubMessage2( String message) { System.out.println( "订阅者2:接收到的mq消息:" +message) ; } }
package com.java1234.consumer.service; public interface RabbitMqService { /*** 接受消息*/public void receiveMessage( ) ; /*** 接受消息*/public void receiveMessage2( String message) ; /*** 接受消息*/public void receiveMessage3( String message) ; /*** 接受订阅消息1*/public void receiveSubMessage1( String message) ; /*** 接受订阅消息2*/public void receiveSubMessage2( String message) ;
}
package com.java1234.consumer; import com.java1234.consumer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext; @SpringBootApplication
public class ConsumerApplication { public static void main( String[ ] args) { ApplicationContext ac = SpringApplication.run( ConsumerApplication.class,args) ;
// RabbitMqService rabbitMqService = ( RabbitMqService) ac.getBean( "rabbitmqService" ) ;
// rabbitMqService.receiveMessage( ) ; }
}
package com.java1234.producer.service.impl; import com.java1234.producer.config.RabbitMQConfig;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; @Service( "rabbitmqService" )
public class RabbitMqServiceImpl implements RabbitMqService { @Autowiredprivate AmqpTemplate amqpTemplate; /*** String exchange 交换机名称* String routingKey 路由Key* Object object 具体发送的消息* @param message*/@Overridepublic void sendMessage( String message) { amqpTemplate.convertAndSend( RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message) ; } @Overridepublic void sendFanoutMessage( String message) { amqpTemplate.convertAndSend( RabbitMQConfig.FANOUT_EXCHANGE,"" ,message) ; } @Overridepublic void sendRoutingMessage ( ) { amqpTemplate.convertAndSend( RabbitMQConfig.DIRECT_EXCHANGE1,"warning" ,"发送warning级别的消息" ) ; }
}
package com.java1234.producer.service; public interface RabbitMqService { /*** 发送消息* @param message*/public void sendMessage( String message) ; /*** 发送消息* @param message*/public void sendFanoutMessage( String message) ; /*** 发送路由模式消息*/public void sendRoutingMessage( ) ;
}
package com.java1234.producer; import com.java1234.producer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication
public class ProducerApplication { public static void main( String[ ] args) { ApplicationContext ac = SpringApplication.run( ProducerApplication.class, args) ; RabbitMqService rabbitMqService = ( RabbitMqService) ac.getBean( "rabbitmqService" ) ; rabbitMqService.sendRoutingMessage( ) ; // for( int i = 0 ; i < 10 ; i++) { rabbitMqService.sendMessage( "RabbitMQ大爷你好!!!" +i) ;
// rabbitMqService.sendFanoutMessage( i+"用户欠费了" ) ;
// } }
}