【spring boot结合rabbit mq 到点执行,可精确到秒】 创建队列枚举 创建自定义的队列消息pojo 创建队列和延迟队列 发送mq 消息 接收mq 消息 DateTimeUtil 测试 注意点
创建队列枚举
public enum QueueEnum { TEST ( 1 , "test" , "队列频道" ) , DELAY_TEST ( 2 , "delay_test" , "延迟延迟频道" ) , ; private Integer code; private String channel; private String desc; QueueEnum ( Integer code, String channel, String desc) { this . code = code; this . channel = channel; this . desc = desc; } public Integer getCode ( ) { return code; } public void setCode ( Integer code) { this . code = code; } public String getChannel ( ) { return channel; } public void setChannel ( String channel) { this . channel = channel; } public String getDesc ( ) { return desc; } public void setDesc ( String desc) { this . desc = desc; } public static String findChannelByCode ( Integer code) { QueueEnum [ ] queueEnums = QueueEnum . values ( ) ; for ( QueueEnum queueEnum : queueEnums) { if ( code == queueEnum. getCode ( ) ) { return queueEnum. getChannel ( ) ; } } return "" ; }
}
创建自定义的队列消息pojo
import java. io. Serializable ;
import java. time. LocalDate ;
public class QueueMessage implements Serializable { private static final long serialVersionUID = 1L ; private QueueEnum queueEnum; private String activityId; private String taskDate; private String msgId; public String getActivityId ( ) { return activityId; } public String getTaskDate ( ) { return taskDate== null ? LocalDate . now ( ) . toString ( ) : taskDate; } public void setQueueEnum ( QueueEnum queueEnum) { this . queueEnum = queueEnum; } public void setActivityId ( String activityId) { this . activityId = activityId; } public void setTaskDate ( String taskDate) { this . taskDate = taskDate; } public String getMsgId ( ) { return msgId; } public void setMsgId ( String msgId) { this . msgId = msgId; } public QueueEnum getQueueEnum ( ) { return queueEnum; } public QueueMessage ( ) { } public QueueMessage ( QueueEnum queueEnum, String activityId) { this . queueEnum = queueEnum; this . activityId = activityId; } public QueueMessage ( QueueEnum queueEnum, String activityId, String msgId) { this . queueEnum = queueEnum; this . activityId = activityId; this . msgId= msgId; } @Override public String toString ( ) { final StringBuilder sb = new StringBuilder ( "QueueMessage{" ) ; sb. append ( "queueEnum=" ) . append ( queueEnum) ; sb. append ( ", activityId='" ) . append ( activityId) . append ( '\'' ) ; sb. append ( ", taskDate='" ) . append ( taskDate) . append ( '\'' ) ; sb. append ( ", mgsId='" ) . append ( msgId) . append ( '\'' ) ; sb. append ( '}' ) ; return sb. toString ( ) ; }
创建队列和延迟队列
import org. springframework. amqp. core. Binding ;
import org. springframework. amqp. core. BindingBuilder ;
import org. springframework. amqp. core. DirectExchange ;
import org. springframework. amqp. core. Queue ;
import org. springframework. beans. factory. annotation. Qualifier ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. core. annotation. Order ; import java. util. HashMap ; @Configuration
@Order ( 1 )
public class TestRabbitConfig { @Bean ( "testQueue" ) public Queue testQueue ( ) { return new Queue ( QueueEnum . TEST . getChannel ( ) ) ; } @Bean ( "testExchange" ) public DirectExchange testExchange ( ) { return new DirectExchange ( QueueEnum . TEST . getChannel ( ) ) ; } @Bean Binding bindingtestQueueToExchange ( @Qualifier ( "testQueue" ) Queue testQueue, @Qualifier ( "testExchange" ) DirectExchange testExchange) { return BindingBuilder . bind ( testQueue) . to ( testExchange) . with ( QueueEnum . TEST . getChannel ( ) ) ; } @Bean ( "delayTestQueue" ) public Queue delayTestQueue ( ) { HashMap < String , Object > arguments = new HashMap < > ( 4 ) ; arguments. put ( "x-max-length" , 500000 ) ; arguments. put ( "x-dead-letter-exchange" , QueueEnum . TEST . getChannel ( ) ) ; arguments. put ( "x-dead-letter-routing-key" , QueueEnum . TEST . getChannel ( ) ) ; return new Queue ( QueueEnum . DELAY_TEST . getChannel ( ) , true , false , false , arguments) ; } @Bean ( "delayTestExchange" ) public DirectExchange delayTestExchange ( ) { return new DirectExchange ( QueueEnum . DELAY_TEST . getChannel ( ) ) ; } @Bean Binding bindingDelayTestQueueToExchange ( @Qualifier ( "delayTestQueue" ) Queue delayTestQueue, @Qualifier ( "delayTestExchange" ) DirectExchange delayTestExchange) { return BindingBuilder . bind ( delayTestQueue) . to ( delayTestExchange) . with ( QueueEnum . DELAY_TEST . getChannel ( ) ) ; }
发送mq 消息
import com. alibaba. fastjson. JSON ;
import com. project. utils. StringUtil ;
import lombok. extern. slf4j. Slf4j ;
import org. springframework. amqp. AmqpException ;
import org. springframework. amqp. core. Message ;
import org. springframework. amqp. core. MessagePostProcessor ;
import org. springframework. amqp. rabbit. core. RabbitTemplate ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. stereotype. Component ; import java. time. Duration ;
import java. time. LocalDateTime ;
@Component
@Slf4j ( topic = "sendMqTask" )
public class SendMqMessage { @Autowired RabbitTemplate rabbitTemplate; public void sendTestMessage ( QueueMessage queueMessage) { String messageId = StringUtil . getUniqueId ( "mq-" ) ; queueMessage. setMsgId ( messageId) ; rabbitTemplate. convertAndSend ( queueMessage. getQueueEnum ( ) . getChannel ( ) , queueMessage. getQueueEnum ( ) . getChannel ( ) , queueMessage, new MessagePostProcessor ( ) { @Override public Message postProcessMessage ( Message message) throws AmqpException { long delayInMs = Duration . between ( LocalDateTime . now ( ) , DateTimeUtil . fromString2LocalDateTime ( queueMessage. getTaskDate ( ) ) ) . toMillis ( ) ; message. getMessageProperties ( ) . setExpiration ( delayInMs+ "" ) ; return message; } } ) ; }
}
接收mq 消息
import com. rabbitmq. client. Channel ;
import lombok. extern. slf4j. Slf4j ;
import org. apache. ibatis. exceptions. PersistenceException ;
import org. mybatis. spring. MyBatisSystemException ;
import org. springframework. amqp. core. Message ;
import org. springframework. amqp. rabbit. annotation. RabbitHandler ;
import org. springframework. amqp. rabbit. annotation. RabbitListener ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. beans. factory. annotation. Value ;
import org. springframework. core. annotation. Order ;
import org. springframework. jdbc. CannotGetJdbcConnectionException ;
import org. springframework. stereotype. Component ; import java. io. IOException ;
import java. time. LocalDateTime ;
import java. util. Arrays ;
import java. util. concurrent. TimeUnit ;
@Component
@Order ( 2 )
@Slf4j ( topic = "receiveMqTask" )
public class ReceiveMqMessage { @Value ( "${spring.profiles.active}" ) private String active; private boolean isProdEnv ( ) { return "prod" . equals ( active) ; } private boolean isTestEnv ( ) { return "test" . equals ( active) ; } @RabbitListener ( queues = ApiConstants . TEST ) @RabbitHandler public void test ( QueueMessage queueMessage, Message message, Channel channel) { String env= isProdEnv ( ) ? "正式" : isTestEnv ( ) ? "测试" : active; log. info ( "====={}== test Mq Message={}" , env, queueMessage) ; long deliveryTag = message. getMessageProperties ( ) . getDeliveryTag ( ) ; try { System . out. println ( "发送时间是:" + queueMessage. getTaskDate ( ) ) ; System . out. println ( "当前时间是:" + LocalDateTime . now ( ) . toLocalDate ( ) + " " + LocalDateTime . now ( ) . toLocalTime ( ) ) ; try { channel. basicAck ( deliveryTag, false ) ; } catch ( IOException e) { log. error ( "MQ手动ACK错误: " , e) ; } } catch ( Exception e) { log. error ( "test queue 失败" ) ; } }
}
DateTimeUtil
public class DateTimeUtil { public static final String FORMAT_DATETIME = "yyyy-MM-dd HH:mm:ss" ; public static String getLocalDateTime ( LocalDateTime localDateTime) { DateTimeFormatter df = DateTimeFormatter . ofPattern ( DateTimeUtil . FORMAT_DATETIME ) ; if ( localDateTime != null ) { String localTime = df. format ( localDateTime) ; return localTime; } return null ; }
}
测试
@RestController
@RequestMapping ( value = "/test" )
public class TestController { @Autowired private SendMqMessage sendMqMessage; @RequestMapping ( value = "/testMqMessage" , method = RequestMethod . GET ) public ResultEntity testMqMessage ( @RequestParam ( value = "second" , defaultValue = "20" , required = false ) Long second) { QueueMessage queueMessage = new QueueMessage ( QueueEnum . DELAY_TEST , "123" ) ; queueMessage. setTaskDate ( DateTimeUtil . getLocalDateTime ( LocalDateTime . now ( ) . plusSeconds ( second) ) ) ; sendMqMessage. sendTestMessage ( queueMessage) ; return "发送成功" ; }
}
注意点