RabbitMQ是如何运转的?

前言

  之前已经介绍了RabbitMQ交换机模型的相关简单概念,都是作为此篇的基础铺垫,如果对此篇不懂的可以先看我上一篇的介绍认识RabbitMQ交换机模型,或者联系评论,分享《RabbitMQ实战指南》电子书给大家,里面虽然有些许错误,但总体还是很棒的一本书!

  本文主要介绍RabbitMQ的消息是怎么产生和通过它是怎么接收消息的(RabbitMQ如何运转)、Connection和Channel概念、RabbitMQ的简单部署、Java代码简单实践三个部分

  

 


 

 

一、RabbitMQ的运转流程

  1、生产者流程  

      1) 生产者连接到RabbitMQ Broker,建立Connection,开启信道Channel(Connection与Channel概念下面会介绍)

      2) 生产者声明一个交换器,设置相关属性。

      3) 生产者声明一个队列并设置相关属性

      4) 生产者通过路由键将交换器和队列绑定起来

      5) 生产者发送消息到RabbitMQ Broker,包括路由键、交换器信息等

      6) 相应的交换器根据路由键查找匹配的队列

      7) 如果找到则消息存入相应队列中

      8) 如果没找到则根据配置的属性丢弃或者回退给生产者

      9) 关闭信道

      10)关闭连接

  2、消费者流程

      1) 消费者连接到RabbitMQ Broker,建立Connection,开启Channel

      2) 消费者向RabbitMQ Broker请求消费相应队列中消息,可能会设置相应的回调函数。

      3) 等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息。

      4) 消费者确认ack接收到的消息。

      5) RabbitMQ从队列中删除相应已经被确认的消息。

      6) 关闭信道。

      7) 关闭连接

 

    其实,最主要最不好理解的也就是Connection与Channel这两个概念,如果只是光看这些流程会相当不理解,为什么先建立Connection再建立Channel,这两个又是什么区别?所以再往下就是介绍Connection与Channel了!

 

二、Connection与Channel概念

  

  1、 Connection:实际就是一条TCP连接,TCP一旦建立起来,客户端紧接着可以创建AMQP信道。

  2、 Channel:每个Channel都有唯一的ID,都是建立在Connection上的虚拟连接,RabbitMQ处理每条AMQP指令都是通过信道完成的

       

                                (结合两张图,更好理解Connection与Channel两个概念)

  

  3、单TCP复用连接与多信道的优势

      1)为什么TCP连接只有一条,而每个生产者都会创建一条唯一的信道呢?想象下,实际情况,会有很多的生产者生产消息,多个消费者消费消息,那么就不得不创建多个线程,建立多个TCP连接。多个TCP连接的建立必然会对操作系统性能消耗较高,也不方便管理。从而选择一种类似于NIO(非阻塞I/O, Non-blocking I/O)技术是很有必要的,多信道的在TCP基础上的建立就是这么实现的。

      2)每个线程都有自己的一个信道,复用了Connection的TCP连接,信道之间相互独立,相互保持神秘,节约TCP连接资源,当然本身信道的流量很大的话,也可以创建多个适当的Connection的TCP连接,需要根据具体业务情况制定

   

三、RabbitMQ部署

  主要以Linux CentOS 7举例部署,

1、准备Erlang环境

安装运行RabbitMQ之前,先安装Erlang环境,因为RabbitMQ是relang语言写的。下载http://www.erlang.org/downloads得到otp_src_21.2.tar.gz包

 1)解压到/opt/erlang目录下,./configure配置生成make make install

[root@hidden]# tar xvf otp_src_21.2.tar.gz 
[root@hidden]# cd otp_src_21.2 
[root@hidden otp_src_21.2]#./configure --prefix=/opt/er1ang

 如果安装过程出现"No curses library functions found",则需要安装ncurses

[root@hidden otp_src_21.2]# yum install ncurses-devel

 2)编译安装make & make install

[root@hidden otp_src_21.2]# make & make install

 3)修改/etc/profile文件,增加如下语句

ERLANG_HOME=/opt/erlang
export PATH=$PATH:$ERLANG_HOME/bin
export ERLANG_HOME

 4)执行/etc/profile配置文件

[root@hidden otp_src_21.2]# source /etc/profile

 5)测试是否安装成功

[root@hidden otp_src_21.2]#erl

如果出现如下语句,则说明安装成功

Erlang/OTP 21 [erts-10.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]Eshell V10.2  (abort with ^G)
1>     

 2、安装RabbiMQ

 RabbitMQ安装比Erlang安装简单很多,下载generic压缩包:rabbitmq-server-generic-unix-3.7.11.tar.xz

 1)解压压缩到到Erlang同目录/opt下

[root@hidden]# tar zvxf rabbitmq-server-generic-unix-3.7.11.tar.xz
[root@hidden]# cd /opt
[root@hidden]# mv rabbitmq-server-generic-unix-3.7.11.tar.xz rabbitmq

 2)修改/etc/profile文件,增加如下语句

export PATH=$PATH:/opt/rabbitmq/sbin
export RABBITMQ_HOME=/opt/rabbitmq

 3)执行profile文件,使其生效

[root@hidden otp_src_21.2]# source /etc/profile

 4)修改运行为守护进程模式

[root@hidden otp_src_21.2]# rabbitmq-server -detached

 5)测试是否安装成功,出现Status of node rabbit@.........如下语句则说明安装成功

[root@hidden rabbitmq]# rabbitmqctl status

Status of node rabbit@iz2ze49fh77zgs1rzxo0l7z ...
[{pid,11462},
{running_applications,
[{rabbit,"RabbitMQ","3.7.11"},
{mnesia,"MNESIA CXC 138 12","4.15.5"},
{os_mon,"CPO CXC 138 46","2.4.7"},
{sysmon_handler,"Rate-limiting system_monitor event handler","1.1.0"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.7.11"},
{ranch,"Socket acceptor pool for TCP protocols.","1.7.1"},
{ssl,"Erlang/OTP SSL application","9.1"},
{public_key,"Public key infrastructure","1.6.4"},
{asn1,"The Erlang ASN1 compiler version 5.0.8","5.0.8"},
{inets,"INETS CXC 138 49","7.0.3"},
{recon,"Diagnostic tools for production use","2.3.6"},
{xmerl,"XML parser","1.3.18"},
{jsx,"a streaming, evented json parsing toolkit","2.9.0"},

..........

 3、新增用户与授权

RabbitMQ默认情况下用户和密码都为“guest”,但只能通过默认的本地网络localhost访问,网络访问受限,所以需要再单独新增用户授予权限

 1)新增root用户

 新增用户名为root,密码为root

[root@hidden rabbitmq]# rabbitmqctl add_user root root

 2)授权root用户到默认vhost可配置、可读、可写权限

[root@hidden rabbitmq]# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

 3)设置root为管理员角色

[root@hidden rabbitmq]# rabbitmqctl set_user_tags root administrator

四、Java代码实践

首先maven下载jar包:

<!-- rabbitmq--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>

1、生产者类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author jian* @date 2019/2/14* @description RabbitMQ测试: 消息服务端**/
public class RabbitProducer {// 路由键private static final String ROUTING_KEY = "routingkey_demo";// 交换机名称private static final String EXCHANGE_NAME = "exchange_demo";// 队列名称private static final String QUEUE_NAME = "queue_demo";// RabbitMQ地址private static final String IP_ADDRESS = "xxx.xxx.xxx.xxx";// RabbitMQ默认端口5672private static final int PORT = 5672;public static void publicMeesage () {// 1)通过连接工厂建立复用TCP连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(IP_ADDRESS);connectionFactory.setPort(PORT);connectionFactory.setUsername("root");connectionFactory.setPassword("root");try {Connection connection = connectionFactory.newConnection();// 2)建立多信道Channel channel = connection.createChannel();// 3)声明交换器:创建一个direct、持久化、非自动删除的交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);// 4)声明队列:创建一个持久化、非排他的、非自动删除的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 5)将交换器与队列通过路由键绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);// 6) 发送持久化消息String message = "hello world!";channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println("producer published message: " + message);// 7)关闭信道
            channel.close();// 8)关闭连接
            connection.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

 

2、消费者类 

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;/*** @author jian* @date 2019/2/14* @description RabbitMQ测试:消费者消费消息*/
public class RabbitConsumer {// 队列名称private static final String QUEUE_NAME = "queue_demo";// RabbitMQ地址private static final String IP_ADDRESS = "xxx.xxx.xxx.xxx";// RabbitMQ默认端口5672private static final int PORT = 5672;public static void recevieMessage() {Address[] addresses = new Address[]{new Address(IP_ADDRESS, PORT)};// 1)通过连接工厂建立复用TCP连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setUsername("root");connectionFactory.setPassword("root");try {// 2)建立连接:此处与生产者建立建立连接是不同的Connection connection = connectionFactory.newConnection(addresses);// 3) 创建channel信道Channel channel = connection.createChannel();// 设置客户端最多接收未被ack消息的个数channel.basicQos(64);// 4)消费者向RabbitMQ Broker请求消费相应队列中消息: 有消息就会执行回调函数handleDeliveryDefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumer received message: " + new String(body, "UTF-8"));try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}};// 5)消费者确认ack接收 到的消息:自动回复队列应答channel.basicConsume(QUEUE_NAME, true, defaultConsumer);// 等待回调函数执行完毕TimeUnit.SECONDS.sleep(5);// 6) 关闭信道
            channel.close();// 7) 关闭连接
            connection.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}

3、测试类

public class RabbitMQTest {public static void main(String[] args) {RabbitProducer.publicMeesage();RabbitConsumer.recevieMessage();}
}

4、测试结果

producer published message: hello world!
consumer received message: hello world!

 

转载于:https://www.cnblogs.com/jian0110/p/10389986.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/278299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何种植屡获殊荣的青豆

Most people don’t know this yet, but I’ve decided to give up computers and become a farmer instead. Since I’m the helpful type, I’ve decided to share everything I know about farming with you, starting with how I won my prize winning green beans. 大多数…

充分利用Microsoft Planner的6种方法

Microsoft Planner is pretty simple to use, but some of its more useful features aren’t front and center. If you’re just creating and moving tasks, here are six ways to get a bit more out of Planner. Here’s everything you need to know. Microsoft Planner的…

最详细的排序解析,理解七大排序

最详细的排序解析&#xff0c;理解七大排序 mp.weixin.qq.com点击上方“方志朋”&#xff0c;选择“置顶或者星标” 你的关注意义重大&#xff01; 注&#xff1a; lgN在这里为1og2N简写 为了方便描述,本文默认用int类型比较&#xff0c;从小到大排序 本文排序算法以java语言…

xp删除管理员账户_在Windows XP中从登录屏幕删除用户帐户

xp删除管理员账户So you login to your computer every single day, but there’s more than one account to choose from… either because you got the computer from somebody else, or some software package added a user account that you really don’t want to see. So…

Pycharm下将py文件打包成exe文件

1. 在PyCharm下安装PyInstaller 1. 首先&#xff0c;打开自己要发布的工程 2. 点击底部的【Terminal】打开终端&#xff0c;中输入命令pip install pyinstaller后回车&#xff0c;如图所示进行安装 3. 输入命令 pyinstaller&#xff0c;回车显示安装成功 4. 输入命令 pyinstall…

什么是自然语言处理,它如何工作?

NicoElNino/Shutterstock.comNicoElNino / Shutterstock.comNatural language processing enables computers to process what we’re saying into commands that it can execute. Find out how the basics of how it works, and how it’s being used to improve our lives. 自…

GIT速查手册

为什么80%的码农都做不了架构师&#xff1f;>>> 一、GIT 1.1 简单配置 git是版本控制系统&#xff0c;与svn不同的是git是分布式&#xff0c;svn是集中式 配置文件位置 # 配置文件 .git/config 当前仓库的配置文件 ~/.gitconfig 全局配置文件# 查看所有配置项 git …

4-3逻辑非运算符及案例 4-4

创建类 LoginDemo3 这里取反 !(n%30) package com.imooc.operator; import java.util.Scanner;public class LoginDemo3 {public static void main(String[] args) {// TODO Auto-generated method stubSystem.out.println("请输入一个整数");Scanner scnew Scanner(…

assistant字体_如何使用Google Assistant设置和致电家庭联系人

assistant字体Google谷歌Google Home and Nest smart speakers and displays allow you to make calls without using your phone. By setting up “Household Contacts,” anyone in your home can easily call friends and family members with Google Assistant-enabled dev…

Accoridion折叠面板

详细操作见代码&#xff1a; <!doctype html> <html><head><meta charset"UTF-8"><title></title><meta name"viewport" content"widthdevice-width,initial-scale1,minimum-scale1,maximum-scale1,user-scal…

skype快捷键_每个Skype键盘快捷键及其用法

skype快捷键Roberto Ricca/Shutterstock罗伯托里卡/ ShutterstockGet familiar with Skype’s unique keyboard shortcuts that will allow you to quickly change your settings, alter your interface, and control your communications. Use these hotkeys and become a Sky…

YouTube键盘快捷键:速查表

Google’s video website wouldn’t be complete without all sorts of useful buttons and hidden commands that aren’t immediately obvious. Use this hotkey cheat sheet to quickly navigate YouTube and gain better control over your video browsing experience. 如果…

MySQL服务读取参数文件my.cnf的规律研究探索

在MySQL中&#xff0c;它是按什么顺序或规律去读取my.cnf配置文件的呢&#xff1f;其实只要你花一点功夫&#xff0c;实验测试一下就能弄清楚&#xff0c;下面的实验环境为5.7.21 MySQL Community Server。其它版本如有不同&#xff0c;请以实际情况为准。 其实&#xff0c;MyS…

将组策略编辑器添加到控制面板

If you find yourself using the Group Policy Editor all the time, you might have wondered why it doesn’t show up in the Control Panel along with all the other tools. After many hours of registry hacking, I’ve come up with a registry tweak to let you do ju…

Exchange Server 2016管理系列课件50.DAG管理之激活数据库副本

激活邮箱数据库副本是将特定被动副本指定为邮箱数据库的新主动副本的过程。我们将此过程称为数据库切换。数据库切换过程是指卸除当前的活动数据库&#xff0c;然后在指定的服务器上将相应的数据库副本作为新的活动邮箱数据库副本进行装载。成为活动邮箱数据库的数据库副本必须…

常见设计模式 (python代码实现)

1.创建型模式 单例模式 单例模式&#xff08;Singleton Pattern&#xff09;是一种常用的软件设计模式&#xff0c;该模式的主要目的是确保某一个类只有一个实例存在。当你希望在整个系统中&#xff0c;某个类只能出现一个实例时&#xff0c;单例对象就能派上用场。 比如&#…

记录一次解决httpcline请求https报handshake_failure错误

概述 当使用httpclinet发起https请求时报如下错误&#xff1a; javax.net.ssl.SSLHandshakeException: Received fatal alert: handshake_failureat com.sun.net.ssl.internal.ssl.Alerts.getSSLException(Alerts.java:174)at com.sun.net.ssl.internal.ssl.Alerts.getSSLExcep…

桌面程序explorer_备份Internet Explorer 7搜索提供程序列表

桌面程序explorerIf you are both an IE user and a fan of using custom search providers in your search box, you might be interested to know how you can back up that list and/or restore it on another computer. Yes, this article is boring, but we’re trying to…

GreenPlum数据库故障恢复测试

本文介绍gpdb的master故障及恢复测试以及segment故障恢复测试。 环境介绍&#xff1a;Gpdb版本&#xff1a;5.5.0 二进制版本操作系统版本&#xff1a; centos linux 7.0Master segment: 192.168.1.225/24 hostname: mfsmasterStadnby segemnt: 192.168.1.227/24 hostname: ser…

书评:Just the Computer Essentials(Vista)

Normally we try and focus on articles about how to customize your computer, but today we’ll take a break from that and do a book review. This is something I’ve not done before, so any suggestions or questions will be welcomed in the comments. 通常&#x…