1 下载zip 安装包
rocketmq-all-5.1.4-bin-release.zip
2 修改启动配置,防止默认内存配置过高
runserver.sh/runbroker.sh/tools.sh
3 启动namesrv
nohup sh bin/mqnamesrv >>namesrv.log &
4 启动broker+proxy
单点模式:
nohup sh bin/mqbroker -c conf/brock.conf -pc conf/rmq-proxy.json --enable-proxy >>broker.log&
集群模式:2m+2s-async 多主多从,异步复制
A1机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-a.properties -pc conf/rmq-proxy-a.json --enable-proxy >>broker-a.log&
B2机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-b.properties -pc conf/rmq-proxy-b.json --enable-proxy >>broker-b.log&
B3机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-a-s.properties -pc conf/rmq-proxy-a-s.json --enable-proxy >>broker-a-s.log&
A4机器>nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-b-s.properties -pc conf/rmq-proxy-b-s.json --enable-proxy >>broker-b-s.log&
5 配置控制面板 rocketmq-dashboard
nohup java -jar -Xms512m -Xmx1G rocketmq-dashboard-1.0.0.jar >> rocketmq-dashboard.log &
注意事项:
- 所有ip配置均配置本机外网ip
- 端口不重复,安全组开启相应端口
- -n 多主机情况,需要增加引号或双引号
- broker 监听端口,不重复
- proxy 监听端口,不重复
6 集成到spring cloud 具体服务中:
0 通过rocketmq-dashboard 增加topic
1 增加配置
#NameServer地址
rocketmq.name-server: 192.168.1.1:9876
rocketmq.producer.group: sale-producer-group
2 增加依赖
implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1'
3 消息生产者
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;import javax.annotation.Resource;@RestController
@AllArgsConstructor
public class SaleMQProducer {private final Logger LOGGER = LoggerFactory.getLogger(SaleMQProducer.class);@Resourceprivate RocketMQTemplate rocketMQTemplate;@RequestMapping(value = "/api/mq/create", method = RequestMethod.POST)Object create(@RequestBody JSONObject req) {return Mono.defer(() -> {String topic = req.getString("topic");String tag = req.getString("tag");String body = req.getString("body");LOGGER.info("topic={},tag={},body={}", topic, tag, body);//支持带tag发送String dest = String.format("%s:%s", topic, tag);rocketMQTemplate.convertAndSend(dest, body);return Mono.just(true);});}
}
4 消息消费者
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(consumerGroup = "sale-consumer-group", topic = "TEST_FIRST")
public class SaleMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : " + message);}
}
5 消息消费者,增加tag匹配(默认* 全量匹配)
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(consumerGroup = "sale-consumer-group-tag-a", topic = "TEST_FIRST", selectorExpression = "TagA")
public class SaleMQConsumerTagA implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : " + message);}
}