1. 创建RocketMq文件目录
mkdir rocketmq
mkdir -p rocketmq/brokerconf rocketmq/logs rocketmq/store
2.创建broker.conf配置文件
vim brokerconf/broker.conf
# 集群名称
brokerClusterName = DefaultCluster
# broker 名字
brokerName = broker-a
# 0表示master,>0 表示slave
brokerId = 0
# 删除文件的时间点
deleteWhen = 04
# 文件保留时间
fileReservedTime = 48
# Broker 的角色
# # - ASYNC_MASTER 异步复制Master
# # - SYNC_MASTER 同步双写Master
# # - SLAVE
brokerRole = ASYNC_MASTER
# 刷盘方式
# # - ASYNC_FLUSH 异步刷盘
# # - SYNC_FLUSH 同步刷盘
flushDiskType = ASYNC_FLUSH# nameserver地址
namesrvAddr=60.204.149.224:9876
brokerIP1=60.204.149.224
3.创建docker-compose.yml文件
version: '3.8'
services:namesrv:image: apache/rocketmq:5.3.1container_name: rmqnamesrvports:- 9876:9876volumes:- /root/docker/rocketmq/logs:/opt/logs- /root/docker/rocketmq//store:/opt/storenetworks:- rocketmqcommand: sh mqnamesrvbroker:image: apache/rocketmq:5.3.1container_name: rmqbrokerports:- 10909:10909- 10911:10911- 10912:10912volumes:- /root/docker/rocketmq/logs:/opt/logs- /root/docker/rocketmq/store:/opt/store- /root/docker/rocketmq/brokerconf/broker.conf:/etc/rocketmq/broker.confenvironment:- NAMESRV_ADDR=rmqnamesrv:9876- JAVA_OPTS=" -Duser.home=/opt"depends_on:- namesrvnetworks:- rocketmqcommand: sh mqbroker -c /etc/rocketmq/broker.confproxy:image: apache/rocketmq:5.3.1container_name: rmqproxynetworks:- rocketmqdepends_on:- broker- namesrvports:- 8080:8080- 8081:8081restart: on-failureenvironment:- NAMESRV_ADDR=rmqnamesrv:9876command: sh mqproxy
networks:rocketmq:driver: bridge
4.启动rocketmq
docker-compose -f docker-compose.yml up -d
5.关闭rocketmq
docker-compose down
6.测试
6.1 消息生产者
import static com.doudou.mq.MqConfig.NAME_SRV_ADDR;
import static com.doudou.mq.MqConfig.topic;import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");producer.setSendMsgTimeout(10000);producer.setNamesrvAddr(NAME_SRV_ADDR);producer.start();for (int i = 0; i < 10; i++) {try {Message message = new Message(topic, ("Hello World" + i).getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);System.out.printf("%s%n", result);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
6.2 消息消费者
import static com.doudou.mq.MqConfig.NAME_SRV_ADDR;
import static com.doudou.mq.MqConfig.topic;import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DemoConsumer");consumer.setNamesrvAddr(NAME_SRV_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(topic, "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println(new String(messageExt.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started");}}
6.3 依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version></dependency>