目录
生产者
发送消息固定步骤
发送模式
1. 单向发送
2. 同步发送
3. 异步发送
生产消息完整代码
消费者
消费消息固定步骤
简单消费代码示例
消息模型
广播消息
顺序消息
延迟消息
批量消息
事务消息
生产者
发送消息固定步骤
1.创建消息生产者producer,并指定生产者组名
2.指定Nameserver地址, 也可配置环境变量NAMESRV_ADDR
3.启动producer。 可以认为这是消息生产者与服务端建立连接的过程。
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer,释放资源
发送模式
1. 单向发送
// 发送单向消息producer.sendOneway(buildOneMessage());
2. 同步发送
// 同步发送消息SendResult sendResult = producer.send(buildOneMessage());System.out.println("同步发送消息结果:" + sendResult);
3. 异步发送
// 异步发送消息, 发送完回调, 注意:回调完之前, producer不能关闭producer.send(buildOneMessage(), new SendCallback() {// 成功回调@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送消息结果:" + sendResult);}// broker处理失败回调, 超时回调, 进入onException也可能发送下游成功, 做好幂等性@Overridepublic void onException(Throwable throwable) {System.out.println("异步发送消息出现异常:" + throwable.getMessage());}});
生产消息完整代码
public class ProducerTest {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("192.168.6.128:9876");// 启动生产者producer.start();// 发送单向消息producer.sendOneway(buildOneMessage());System.out.println("单向发送消息, 不知道结果");// 同步发送消息SendResult sendResult = producer.send(buildOneMessage());System.out.println("同步发送消息结果:" + sendResult);// 异步发送消息, 发送完回调, 注意:回调完之前, producer不能关闭producer.send(buildOneMessage(), new SendCallback() {// 成功回调@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送消息结果:" + sendResult);}// broker处理失败回调, 超时回调, 进入onException也可能发送下游成功, 做好幂等性@Overridepublic void onException(Throwable throwable) {System.out.println("异步发送消息出现异常:" + throwable.getMessage());}});
// producer.shutdown();}private static Message buildOneMessage() {return new Message("kk_test_topic", "tagA", ("hello MQ" + new Random().nextInt(100)).getBytes());}
}
消费者
消费消息固定步骤
1.创建消费者Consumer,必须指定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer。消费者会一直挂起,持续处理消息。
简单消费代码示例
public class SimpleConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("kk_consumer_group1");consumer.setNamesrvAddr("192.168.6.128:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe("kk_test_topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("收到消息:" + new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("consumer start ...");}
}