RocketMQ版本4.9.4
pom
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>
yml
rocketmq:producer:retry-times-when-send-async-failed: 2group: a-groupname-server: 192.168.0.211:9876consumer:group: consumer_group
Listener
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "Test_Topic",selectorExpression="TagA", suspendCurrentQueueTimeMillis = 2000, consumeMode = ConsumeMode.ORDERLY)
public class TestMessageQueueListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {String body = new String(messageExt.getBody());System.out.println("key "+ messageExt.getKeys() +"消费者收到:" + body);Alarm alarm = JSONUtil.toBean(body, Alarm.class);if(alarm.getId() == 2){int a = 1/0;}System.out.println("next");}
}
Producer
@Configuration
@EnableScheduling
@Slf4j
public class AlarmRuleSchedule {@ResourceRocketMQTemplate rocketTemplate;@Scheduled(fixedRate = 60 * 1000)private void configureTasks() {for (int i = 1 ; i <11 ;i++){Alarm alarm = new Alarm(i, "test_topic" + i);Message<Alarm> message = MessageBuilder.withPayload(alarm).setHeader(RocketMQHeaders.KEYS, i).build();rocketTemplate.asyncSendOrderly("Test_Topic:TagA", message, String.valueOf(i), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(alarm.toString() + sendResult.getMessageQueue() + "send message success " + sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.error(alarm.toString() + "send onException " + throwable.getLocalizedMessage());}});try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
}