1.引入依赖
< dependency> < groupId> org. apache. rocketmq< / groupId> < artifactId> rocketmq- spring- boot- starter< / artifactId> < version> 2.2 .0 < / version>
< / dependency>
2.配置服务器地址
#Rocketmq 配置
rocketmq. name- server= 192.168 .11 .99 : 9876
# 必须指定生产者组
rocketmq. producer. group= group01
# 消息发送超时时长,默认3 s
rocketmq. producer. send- message- timeout= 3000
# 同步发送消息失败重试次数,默认2
rocketmq. producer. retry- times- when- send- failed= 3
# 异步发送消息失败重试次数,默认2
rocketmq. producer. retry- times- when- send- async- failed= 3
3.创建生产者
package com. by. rocketmq. consumer ; import lombok. extern. slf4j. Slf4j ;
import org. apache. rocketmq. client. consumer. DefaultMQPushConsumer ;
import org. apache. rocketmq. spring. annotation. RocketMQMessageListener ;
import org. apache. rocketmq. spring. core. RocketMQListener ;
import org. apache. rocketmq. spring. core. RocketMQPushConsumerLifecycleListener ;
import org. springframework. stereotype. Component ; @Component
@Slf4j
@RocketMQMessageListener ( topic = "topic_01" , consumerGroup = "group_205" )
public class RocketMqConsumer implements RocketMQListener < String > , RocketMQPushConsumerLifecycleListener { @Override public void onMessage ( String massage) { log. info ( "消费者1:{}" + massage) ; } @Override public void prepareStart ( DefaultMQPushConsumer consumer) { consumer. setMaxReconsumeTimes ( 2 ) ; consumer. setPullBatchSize ( 250 ) ; }
}
4.创建消费者
package com. by. rocketmq. provider ; import com. by. moder. RegisterOk ;
import org. apache. rocketmq. spring. core. RocketMQTemplate ;
import org. springframework. beans. factory. annotation. Autowired ; import org. springframework. messaging. Message ;
import org. springframework. messaging. support. MessageBuilder ;
import org. springframework. stereotype. Component ; @Component
public class RocketMqProvider { @Autowired private RocketMQTemplate rocketMQTemplate; public void send ( String msg) { Message < String > msg1 = MessageBuilder . withPayload ( msg) . build ( ) ; rocketMQTemplate. syncSend ( "topic_01" , msg1, 3000 , 3 ) ; } }
测试
package com. by ; import com. by. moder. RegisterOk ;
import com. by. rocketmq. provider. RocketMqProvider ;
import lombok. extern. slf4j. Slf4j ;
import org. junit. jupiter. api. Test ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. boot. test. context. SpringBootTest ; import java. io. IOException ; @Slf4j
@SpringBootTest
class RocketMqTests { @Autowired RocketMqProvider rocketMqProvider; @Test void Test1 ( ) throws IOException { for ( int i = 1 ; i <= 10 ; i++ ) { rocketMqProvider. send ( "你好++" + i+ "++" ) ; log. info ( "发送成功:" + i) ; } System . in. read ( ) ; } @Test void Test2 ( ) throws IOException { rocketMqProvider. send ( "你好++++" ) ; log. info ( "发送成功:" + 1 ) ; System . in. read ( ) ; } }
死信队列
@Service
@Slf4j
@RocketMQMessageListener ( consumerGroup = "${rocketmq.consumer.group}" , topic = "topic_01" )
public class Consumer implements RocketMQListener < String > , RocketMQPushConsumerLifecycleListener { @Override public void onMessage ( String message) { System . out. println ( "Received message: " + message) ; log. info ( "Received message: " + message) ; ; throw new RuntimeException ( "test" ) ; } @Override public void prepareStart ( DefaultMQPushConsumer consumer) { consumer. setMaxReconsumeTimes ( 2 ) ; consumer. setPullBatchSize ( 16 ) ; }
}