<本文已参与 RocketMQ Summit 优秀案例征文活动,点此了解详情>
MQ简介
MQ(Message Queue)是一种跨进程的通信机制,用于消息传递。通俗点说,就是一个先进先出的数据结构。
MQ应用场景
异步解耦
很多场景不使用MQ会产生各个应用见紧密耦合在在一起,其实我们要遵循的原则就是高内聚低耦合,通过上图我们就可以看到,消息生产者,不管消息消费者状态如何,生产好的消息就直接投递到MQ中,消息消费者也是同样,不管消息生产者如何,只取MQ中的消息进行处理。这是解耦场景,还有就是异步,我们拿最简单的注册场景举例子。
这样如果正常情况我们需要150ms,但是正常的话,用户不需要等待邮件系统,以及短信系统的完成,一旦用户在注册系统完成了以后,就注册成功了,这样的话我们就可以使用异步,
这样通过MQ我们就实现了异步的操作。
流量削峰
流量削峰也是消息队列MQ的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游
的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用
和下游通知系统之间加入消息队列MQ。比如平常用户的请求我们会直接访问数据库,在大量用户过来的时候,这样的话我们会对数据库照常比较大的压力。在这里我们增加一个消息队列,这样的话不管你请求来多少,我先存入消息队列,然后我再让系统慢慢的处理你的请求,这样很好的减缓了数据库的访问压力。
RocketMQ介绍
阿里巴巴的MQ中间件,由java语言开发, 性能非常好,能够撑住双十一的大流量,而且使用起来很简单。
我们开始吧
1.首先就是环境搭建。我们将在Linux操作系统搭建RocketMQ的服务端。
首先就是下载RocketMQ了。
Downloading the Apache RocketMQ Releases - Apache RocketMQ
然后就是登录ssh连接相关功能了。
另一个环境就是Java环境了。
可以参考另外一个链接Linux搭建Java环境-阿里云开发者社区
在此就不过多赘述了。
然后我们就是下载好了所需要的一些安装包。
我们把下载的rocketmq的文件上传
然后使用解压缩命令进行解压。
unzip rocketmq-all-4.9.2-bin-release.zip
首先修改一下目录名称
然后通过cd 命令进去rocketmq目录下的bin目录。
我们首先改一些脚本的参数,由于本服务器比较小,所以需要修改,正常情况,修改成真实的1/2就可以了。
通过vim命令先修改runserver.sh
修改根据你的大小。我就修改成了256m。同样runbrocker.sh也需要修改。
然后在bin使用命令启动nohup ./mqnamesrv &
然后通过这个命令可以查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
然后如果是云服务器的话,我们需要配置一个地方,进入上一级目录
cd ..
然后进入conf目录。
通过vim brocker.conf
添加下面的代码
brokerIP1和brokerIP2默认获取本地ip地址,在云服务器上会获取内网ip地址,因此必须显式设置
namesrvAddr=150.158.31.224:9876
brokerIP1=150.158.31.224
将namesrvAddr设置在configfile中
ip一定要换上你服务器外网的ip
然后我们需要启动mqbrocker
回到rockermq根目录(特别重要要不然运行不成功)
nohup sh bin/mqbroker -c conf/broker.conf &
查看启动日志这样就可以了。
tail -f ~/logs/rocketmqlogs/broker.log
然后这样我们的服务端就可以配置完成了。
这样我们是没有可视化界面的。
当然RocketMQ有可视化界面的。
https://github.com/apache/rocketmq-externals/tags
下载后解压出来。
然后配置一下application.properties
然后配置端口以及刚才我们配置的ip以及端口。
然后返回根目录执行打包命令。
mvn package -Dmaven.test.skip=true
然后就可以在\target目录看见打出来的包。
然后使用java -jar rocketmq-console-ng-1.0.0.jar
刚刚打出来的包进行运行
然后就是可视化界面了。
下面就是实战
我们结合java进行实战消息发送。
消息发送步骤:
1.创建消息生产者,指定生产者所属的组名
2.指定Nameserver地址
3.启动生产者
4.创建消息对象,指定主题、标签和消息体
5.发送消息
6.关闭生产者
然后我们新建test类,进行消息投递测试。
public class RocketMQSendMessageTest {
//发送消息
public static void main(String[] arg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {//1.创建消息生产者,指定生产者所属的组名DefaultMQProducer producer=new DefaultMQProducer("myproducer-group");// 2.指定Nameserver地址// producer.setNamesrvAddr("150.158.31.224:9876");producer.setNamesrvAddr("122.9.161.37:9876");// 3.启动生产者producer.start();// 4.创建消息对象,指定主题、标签和消息体Message message=new Message("myTopic","myTag",("Test RocketMQ Message").getBytes());// 5.发送消息SendResult result=producer.send(message,100000);System.out.println(result);//6.关闭生产者producer.shutdown();
}
}
然后就可以在可视化界面看到我们的消息了。
接受消息步骤:
1.创建消息消费者,指定消费者所属的组名
2.指定Nameserver地址
3.指定消费者订阅的主题和标签
4.设置回调函数,编写处理消息的方法
5.启动消息消费者
然后我们新建test类,进行消息消费测试。
public class RocketMQReceiveMessageTest {
public static void main(String[] args) throws MQClientException {// 1.创建消息消费者,指定消费者所属的组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");// 2.指定Nameserver地址consumer.setNamesrvAddr("122.9.161.37:9876");// 3.指定消费者订阅的主题和标签consumer.subscribe("myTopic","*");// 4.设置回调函数,编写处理消息的方法consumer.registerMessageListener(new MessageListenerConcurrently(){//获取接收到的消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {//消费逻辑System.out.println("接收到消息:"+list);//返回消息成功信息。return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5.启动消息消费者consumer.start();System.out.println("消费者启动成功了");
}
}
这样就完成了我们消费者与生产者相关的操作,当然在实际应用中还是有很多需要调整的地方,这样就基本上完成了,RocketMQ的入门操作。
关于RocketMQ的项目实战后期会在这个项目上不断添加,喜欢的请点个start~
项目源码参考一下分支220310_xgc_useRocketMQ
Gitee:springcloud-alibaba: 一个保姆级教学SpringCloud-Alibaba全栈教程
GitHub:GitHub - CoderXGC/springcloud-alibaba: 一个保姆级教学SpringCloud-Alibaba全栈教程