环境配置
RocketMQ版本:5.2.0
RocketMQ SDK版本:5.2.0
引入依赖
implementation 'org.apache.rocketmq:rocketmq-client:5.2.0'
消息生产
消息的种类分成四种,普通消息、顺序消息、事务消息和延时消息,发生消息的方式也分为同步发送、异步发送、单向发送 三种。
1、普通消息
同步发送
同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。
同步发送消息代码:
public class SyncSendProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {int count = 100;// 创建一个producer,参数为生产者组名称DefaultMQProducer producer = new DefaultMQProducer("apg");// 指定nameServer地址producer.setNamesrvAddr("192.168.131.130:9876");// 设置当发送失败时重试发送的次数,默认为2次producer.setRetryTimesWhenSendFailed(3);// 设置发送超时时限为5s,默认3sproducer.setSendMsgTimeout(5000);// 开启生产者producer.start();// 生产并发送100条消息for (int i=0;i<count;i++){byte[] body = ("Hi," + i).getBytes();Message msg = new Message("someTopic", "someTag", body);// 为消息指定keymsg.setKeys("key_"+i);// 发送消息SendResult sendResult = producer.send(msg);System.out.println(sendResult);}// 关闭生产者producer.shutdown();}
}
发送结果:
打印日志:
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB038A005D, offsetMsgId=0A02088900002A9F0000000000011708, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=3], queueOffset=73]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB038C005E, offsetMsgId=0A02088900002A9F00000000000117FC, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=0], queueOffset=73]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB038E005F, offsetMsgId=0A02088900002A9F00000000000118F0, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=1], queueOffset=73]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB038F0060, offsetMsgId=0A02088900002A9F00000000000119E4, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=2], queueOffset=74]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB03910061, offsetMsgId=0A02088900002A9F0000000000011AD8, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=3], queueOffset=74]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB03920062, offsetMsgId=0A02088900002A9F0000000000011BCC, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=0], queueOffset=74]
可以看出来一个topic默认在一个broker上分配四个队列,每次投递消息是以轮询的方式每个队列投递一条消息。
对于返回结果中的sendStatus代表发送状态,在源码中有一下三个值:
public enum SendStatus {SEND_OK, FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE;private SendStatus() {}
}
SEND_OK:发送成功
FLUSH_DISK_TIMEOUT:刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出现这种异常状态,异步刷盘不会出现 。
FLUSH_SLAVE_TIMEOUT:Slave同步超时。当Broker集群设置的Master-Slave的复制方式为同步复制时才可能出现这种异常状态,异步复制不会出现 。
SLAVE_NOT_AVAILABLE:没有可用的Slave。当Broker集群设置为Master-Slave的 复制方式为同步复制时才可能出现这种异常状态,异步复制不会出现。
异步发送
异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
代码:
public class AsyncSendProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {int messageCount = 100;// 创建一个producer,参数为生产者组名称DefaultMQProducer producer = new DefaultMQProducer("apg");// 指定nameServer地址producer.setNamesrvAddr("192.168.131.130:9876");// 指定异步发送失败后不进行重试发送producer.setRetryTimesWhenSendAsyncFailed(0);// 指定新创建的Topic的Queue数量为2,默认为4producer.setDefaultTopicQueueNums(2);// 开启生产者producer.start();// 生产并发送100条消息//由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i=0;i<messageCount;i++){try {final int index = i;byte[] body = ("Hi," + i).getBytes();Message msg = new Message("AsyncProductTopic", "AsyncTopic", body);producer.send(msg, new SendCallback() {//异步发送的回调@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});System.out.println("消息发送完成");}catch (Exception e){e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);// 关闭生产者producer.shutdown();}
}
发送结果
打印日志
6 OK 7F0000014DB473D16E9335B47CD60008
2 OK 7F0000014DB473D16E9335B47CCA0000
12 OK 7F0000014DB473D16E9335B47CD6000B
4 OK 7F0000014DB473D16E9335B47CCA0002
7 OK 7F0000014DB473D16E9335B47CD60007
单向发送
单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。