文章中的rabbitmq使用的是rocketmq-all-5.1.3-bin-release版本,需要安装包的可自行下载
RockerMQ启动停止命令
启动命令
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
tail -f ~/logs/rocketmqlogs/proxy.log
停止命令
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
查看集群状态
sh mqadmin clusterList -n 127.0.0.1:9876
创建topic
sh mqadmin updateTopic -n 127.0.0.1:9876 rocket_test
查看所有topic信息
sh mqadmin topicList -n 127.0.0.1:9876
sh mqadmin topicList -n 127.0.0.1:9876 -c
查看 Topic 路由信息
sh mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest
发送测试消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
消费消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Java代码收发消息
Producer
package com.rocket.demo;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.HashMap;
import java.util.Map;
public class RocketProducerDemo {
private final static String nameServer = "127.0.0.1:9876";
private final static String producerGroup = "my_group2";
// debezium-mysql-source-topic topic-test
private final static String topic = "TopicTest";
public static void main(String[] args) {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// DefaultMQProducer producer = new DefaultMQProducer();
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
for (int i = 0; i < 100; i++) {
Map<String, String> data = new HashMap();
data.put("id", i+"");
data.put("name", i+","+System.currentTimeMillis());
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("TopicTest", "tagA", JSON.toJSONString(data).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg, 10000);
System.out.println(sendResult);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer
package com.rocket.demo;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group");
consumer.setNamesrvAddr("localhost:9876");
// debezium-mysql-source-topic topic-test debezium-mysql-source db-history-debezium-topic debezium-mysql-source
consumer.subscribe("TopicTest", "*"); // 订阅主题和标签,* 表示订阅所有标签
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.println("Received message: " + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started");
}
}
常见问题
service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.96 CQ: 0.96 INDEX: 0.96], messages are put to the slave, message store has been shut down
错误原因:博主测试的服务器磁盘使用率到0.96了,rocketmq不允许磁盘超过0.9,清理下磁盘数据即可