下载地址:下载 | RocketMQ
一. RocketMQ部署
系统:Windows7;环境:JDK1.8、Maven3.3+、Git;
下载 | RocketMQ
1.1 下载,
进入RocketMQ官网http://rocketmq.apache.org/docs/quick-start/,
这里以编写时的最新版本为例,选择Binary版本下载,该版本为编译后的版本,可以直接使用。
Source下载获取的是源码,需要经过编译后才可以使用,不过比较方便自行扩展,有扩展需求可以使用该版本自行编译。
1.2 下载完后解压到自定义目录,MQ解压路径\rocketmq-all-4.6.0-bin-release;(Windows10系统解压路径不要出现空格)
1.3 配置环境变量,变量名:ROCKETMQ_HOME 变量值:MQ解压路径\MQ文件夹名;path后追加加;%ROCKETMQ_HOME%\bin
1.4 启动NameServer,
cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqnamesrv.cmd’,启动NAMESERVER。成功后会弹出提示框,此框勿关闭。
或者直接使用mqnamesrv.cmd
在当前命令行中启动。
1.5 启动Broker,
cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,启动BROKER。成功后会弹出提示框,此框勿关闭。启动闪退可参看:windows下安装rocketmq采坑全记录,
windows下安装rocketmq采坑全记录-CSDN博客
删除C:\Users\”当前系统用户名”\store下的所有文件。
启动成功:
快捷启动:创建两个bat文件
MqNameServe.bat文件内容如下:
start mqnamesrv.cmd
MqBroker.bat文件内容如下:
start mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
注意:启动成功后,cmd窗口都不可以关闭
二. RocketMQ代码测试
product
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class Test {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer order = new DefaultMQProducer("order");order.setNamesrvAddr("localhost:9876");order.setSendMsgTimeout(60000);order.start();Message message = new Message("sanyouTopic", "myTag", ("test").getBytes());SendResult result = order.send(message);System.out.println(result);order.shutdown();}
}
consumer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Test2 {public static void main(String[] args) throws MQClientException {// 通过push模式消费消息,指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");// 指定NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅这个topic下的所有的消息consumer.subscribe("sanyouTopic", "*");// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.printf("Consumer Started.%n");}
}
Test发送一条消息,Test2随时就监听