使用Scala,Play和Akka连接到RabbitMQ(AMQP)

在本文中,我们将研究如何从Scala连接到RabbitMQ,以便可以从应用程序中支持AMQP协议。 在此示例中,我将使用Play Framework 2.0作为容器(有关更多信息,请参阅我在该主题上的其他文章 )在其中运行应用程序,因为Play使得使用Scala进行开发变得更加容易。 本文还将使用Akka actor发送和接收RabbitMQ的消息。

什么是AMQP

首先,快速介绍AMQP。 AMQP代表“高级消息队列协议”,是消息传递的开放标准。 AMQP 主页声明其愿景是:“成为所有消息中间件之间互操作性的标准协议”。 AMQP定义了用于交换消息的传输级别协议,该协议可用于集成来自许多不同平台,语言和技术的应用程序。
有许多工具可以实现此协议,但是RabbitMQ引起了越来越多的关注。 RabbitMQ是使用AMQP的基于Erlang的开源消息代理。 所有会说AMQP的应用程序都可以连接并使用RabbitMQ。 因此,在本文中,我们将展示如何将基于Play2 / Scala / Akka的应用程序连接到RabbitMQ。 在本文中,我们将向您展示如何实现两种最常见的方案:

  • 发送/接收:我们将配置一个发件人每隔几秒钟发送一条消息,并使用两个侦听器以循环方式从队列中读取消息。
  • 发布/订阅:在此示例中,我们将创建几乎相同的方案,但是这次,侦听器将同时获得消息。

我假设您已经安装了RabbitMQ。 如果不是,请按照其网站上的说明进行操作。

设置基本的Play 2 / Scala项目

在此示例中,我创建了一个新的Play 2项目。 这样做很容易:

jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ_            _ _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/ play! 2.0-RC2, http://www.playframework.orgThe new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQWhat is the application name? 
> PlayAndRabbitMQWhich template do you want to use for this new application? 1 - Create a simple Scala application2 - Create a simple Java application3 - Create an empty project> 1OK, application PlayAndRabbitMQ is created.Have fun!

我曾经使用scala-ide插件在Eclipse上工作,所以我执行play eclipsify并将项目导入Eclipse。
我们需要做的下一步是建立正确的依赖关系。 Play为此使用sbt,并允许您从项目目录中的build.scala文件配置依赖项。 我们将添加的唯一依赖关系是RabbitMQ的Java客户端库。 即使Lift提供了一个基于Scala的AMQP库,但我发现直接使用RabbitMQ也是一样容易。 添加依赖项后,我的build.scala如下所示:

import sbt._
import Keys._
import PlayProject._object ApplicationBuild extends Build {val appName         = "PlayAndRabbitMQ"val appVersion      = "1.0-SNAPSHOT"val appDependencies = Seq("com.rabbitmq" % "amqp-client" % "2.8.1")val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings()
}

将rabbitMQ配置添加到配置文件

对于我们的示例,我们可以配置一些东西。 将消息发送到的队列,要使用的交换以及运行RabbitMQ的主机。 在实际情况下,我们将需要设置更多的配置选项,但是在这种情况下,我们只有这三个。 将以下内容添加到您的application.conf中,以便我们可以从我们的应用程序中引用它。

#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1

现在,我们可以使用ConfigFactory访问这些配置文件。 为了便于访问,请创建以下对象:

object Config {val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}

初始化与RabbitMQ的连接

在研究如何使用RabbitMQ发送和接收消息之前,我们还有一个要定义的对象。 要使用RabbitMQ,我们需要一个连接。 我们可以使用ConnectionFactory获得与服务器的连接。 查看javadocs以获取有关如何配置连接的更多信息。

object RabbitMQConnection {private val connection: Connection = null;/*** Return a connection if one doesn't exist. Else create* a new one*/def getConnection(): Connection = {connection match {case null => {val factory = new ConnectionFactory();factory.setHost(Config.RABBITMQ_HOST);factory.newConnection();}case _ => connection}}
}

在应用程序启动时启动侦听器

在查看RabbitMQ代码之前,我们还需要做一件事。 我们需要确保在应用程序启动时注册了消息侦听器,并且发件人开始发送。 播放2提供了
为此的GlobalSettings对象,您可以在应用程序启动时扩展该对象以执行代码。 对于我们的示例,我们将使用以下对象(请记住,该对象需要存储在默认名称空间中:

import play.api.mvc._
import play.api._
import rabbitmq.Senderobject Global extends GlobalSettings {override def onStart(app: Application) {Sender.startSending}
}

我们将在下面的部分中查看此Sender.startSending操作,该操作将初始化所有发送者和接收者。

设置发送和接收方案

让我们看一下Sender.startSending代码,该代码将设置一个将msg发送到特定队列的发送方。 为此,我们使用以下代码:

object Sender {def startSending = {// create the connectionval connection = RabbitMQConnection.getConnection();// create the channel we use to sendval sendingChannel = connection.createChannel();// make sure the queue exists we want to send tosendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new SendingActor(channel = sendingChannel, queue = Config.RABBITMQ_QUEUE))), "MSG to Queue");}
}class SendingActor(channel: Channel, queue: String) extends Actor {def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish("", queue, null, msg.getBytes());Logger.info(msg);}case _ => {}}
}

在此代码中,我们采取以下步骤:

  1. 使用工厂检索到RabbitMQ的连接
  2. 在此连接上创建一个通道,用于与RabbitMQ通信
  3. 使用通道创建队列(如果尚不存在)
  4. 安排Akka每秒向演员发送一条消息。

所有这些都应该非常简单。 唯一(有点)复杂的部分是调度部分。 此调度操作的作用是这样的。 我们告诉Akka安排要发送给演员的消息。 我们希望在发射之前有2秒的延迟,并且我们希望每秒重复一次此作业。 应该用于此的actor是SendingActor,您也可以在此清单中看到。 该参与者需要访问通道以发送消息,并且该参与者还需要知道接收消息的位置。 这是队列。
因此,该Actor每秒将收到一条消息,附加一个时间戳,并使用提供的通道将此消息发送到队列:channel.basicPublish(“”,queue,null,msg.getBytes());。 现在我们每秒发送一条消息,在此队列上有可以接收消息的侦听器将是很好的。 为了接收消息,我们还创建了一个Actor,可以在特定队列上无限期地进行监听。

class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {// called on the initial rundef receive = {case _ => startReceving}def startReceving = {val consumer = new QueueingConsumer(channel);channel.basicConsume(queue, true, consumer);while (true) {// wait for the messageval delivery = consumer.nextDelivery();val msg = new String(delivery.getBody());// send the message to the provided callback function// and execute this in a subactorcontext.actorOf(Props(new Actor {def receive = {case some: String => f(some);}})) ! msg}}
}

这个actor比我们以前发送的actor要复杂一些。 当该参与者收到消息(消息的种类无关紧要)时,它将开始侦听创建该消息的队列。 它通过使用提供的通道创建使用者来实现此目的,并告诉使用者开始在指定队列上侦听。 Consumer.nextDelivery()方法将阻塞,直到消息在配置的队列中等待。 收到消息后,将创建一个新的Actor,将消息发送到该Actor。 这个新的参与者将消息传递给提供的方法,您可以在其中放置业务逻辑。
要使用此侦听器,我们需要提供以下参数:

  • 频道:允许访问RabbitMQ
  • 队列:监听消息的队列
  • f:收到消息后我们将执行的功能。

第一个示例的最后一步是将所有内容粘合在一起。 为此,我们向Sender.startSending方法添加了几个方法调用。

def startSending = {...val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);// create an actor that starts listening on the specified queue and passes the// received message to the provided callbackval callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);// setup the listener that sends to a specific queue using the SendingActorsetupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);...}private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");}

在此代码中,您可以看到我们定义了一个回调函数,并使用此回调函数以及队列和通道来创建ListeningActor。 我们使用scheduleOnce方法在单独的线程中启动此侦听器。 现在,使用此代码,我们可以运行应用程序(播放运行),打开localhost:9000来启动应用程序,我们应该看到类似以下输出的内容。

[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822

在这里,您可以清楚地看到消息的循环处理方式。

设置发布和订阅方案

一旦我们运行了上面的代码,添加发布/订阅功能就变得非常简单。 现在,我们使用PublishingActor代替SendingActor:

class PublishingActor(channel: Channel, exchange: String) extends Actor {/*** When we receive a message we sent it using the configured channel*/def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish(exchange, "", null, msg.getBytes());Logger.info(msg);}case _ => {}}
}

RabbitMQ使用交换来允许多个收件人接收相同的消息(以及许多其他高级功能)。 来自其他参与者的代码唯一的变化是,这次我们将消息发送到交换而不是队列。 侦听器代码完全相同,我们唯一需要做的就是将队列连接到特定的交换机。 这样,该队列上的侦听器就可以接收发送到交换机的消息。 我们再次根据之前使用的设置方法执行此操作。

...// create a new sending channel on which we declare the exchangeval sendingChannel2 = connection.createChannel();sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");// define the two callbacks for our listenersval callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);// create a channel for the listener and setup the first listenerval listenChannel1 = connection.createChannel();setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback3);// create another channel for a listener and setup the second listenerval listenChannel2 = connection.createChannel();setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback4);// create an actor that is invoked every two seconds after a delay of// two seconds with the message "msg"Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new PublishingActor(channel = sendingChannel2, exchange = Config.RABBITMQ_EXCHANGEE))), "MSG to Exchange");...

我们还为setupListener创建了一个重载方法,该方法作为一个附加参数,也接受要使用的交换的名称。

private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {channel.queueBind(queueName, exchange, "");Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");}

在这小段代码中,您可以看到我们将提供的队列(在我们的示例中是一个随机名称)绑定到指定的交易所。 之后,我们将创建一个新的监听器,如我们之前所见。
现在运行此代码将产生以下输出:

[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006

如您所见,在这种情况下,两个侦听器都收到相同的消息。 这几乎涵盖了本文的全部内容。 如您所见,为RabbitMQ使用基于Java的客户端api绰绰有余,并且可以从Scala轻松使用。 请注意,尽管该示例尚未准备好投入生产,但您应注意关闭连接,并很好地关闭侦听器和参与者。 这里没有显示所有关闭代码。

参考:通过我们的JCG合作伙伴 Jos Dirksen在Smart Java博客上使用Scala,Play和Akka连接到RabbitMQ(AMQP) 。


翻译自: https://www.javacodegeeks.com/2012/04/connect-to-rabbitmq-amqp-using-scala.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/372999.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿尔法贝塔阀原理_图总结 - 阿尔法个贝塔 - 博客园

一.思维导图二.概念笔记图的存储结构1. 邻接矩阵定义:设图G有n (n大于等于1) 个顶点,则邻接矩阵是一个n阶方阵。当矩阵中的 [i,j] !0(下标从1开始) ,代表其对应的第i个顶点与第j个顶点是连接的特点无向图的邻接矩阵是对称矩阵,n个顶点的无向图…

WebApi Post 后台无法获取参数的解决方案

事件回放: 之前一段时间,公司里前端用的Angularjs 发送http请求也是用的ng的组件,后台是.Net的WebApi 前端 var data {PArgs: {PageIndex: 0,PageSize: 8,RowsCount: 0} };$http.post("/Api/Test/ABC", data).success(function (d…

南京大学计算机系周小莉,周会群

媒体报道:南京大学周会群:用计算机聪明地做实验Q《中国教育网络》A周会群Q:南京大学的高性能计算中心非常特殊,分布在物理,化学、天文、地球科学四个不同的学科中,为什么采取这种模式?A&#xf…

不要怂,就是GAN (生成式对抗网络) (五):无约束条件的 GAN 代码与网络的 Graph...

GAN 这个领域发展太快,日新月异,各种 GAN 层出不穷,前几天看到一篇关于 Wasserstein GAN 的文章,讲的很好,在此把它分享出来一起学习:https://zhuanlan.zhihu.com/p/25071913。相比 Wasserstein GAN &#…

用于MyBatis CRUD操作的Spring MVC 3控制器

到目前为止,我们已经为域类“ User ”创建了CRUD数据库服务,并且还将MyBatis配置与Spring Configuration文件集成在一起。 接下来,我们将使用Spring MVC创建一个网页,以使用MyBatis CRUD服务对数据库执行操作。 使用MyBatis 3创建…

2pin接口耳机_拆解报告:雷柏首款真无线耳机XS200

-----我爱音频网拆解报告第185篇-----雷柏是一家历史悠久的鼠标和键盘厂商,截至目前,雷柏(rapoo)总共出了四款耳机,此前曾推出过三款蓝牙耳机, 分别是S500 蓝牙立体声麦克风耳机,S200 蓝牙立体声麦克风耳机&#xff0c…

html表单中阴影,html5中input表单加边框,阴影效果.doc

文档介绍:CSS:input:focus{border-color:#99;}获取焦点时改变颜色focus能同时改变宽度长度背景色…….form,p(margin-bottom:30px;margin-left:20px;).shadow,.one,.two,.three,.four,.five,.six( height:50px; width:280px; border:C;).shadow( -moz-box-shadow:C;…

带有GSON和抽象类的JSON

经过多年使用org.json库在Java中支持JSON数据交换格式后,我已切换到Google Gson 。 org.json是一个较低级的库,因此您必须创建JSONObject,JSONArray,JSONString等…并执行其他低级工作。 Gson简化了这项工作。 它提供了简单的toJs…

深入理解javascript原型和闭包(3)——prototype原型

转载,原文地址http://www.cnblogs.com/wangfupeng1988/p/3978131.html 既typeof之后的另一位老朋友! prototype也是我们的老朋友,即使不了解的人,也应该都听过它的大名。如果它还是您的新朋友,我估计您也是javascript的…

python 温度 符号_Python通过小实例入门学习---1.0(温度转换)

1.安装Python 3 下载地址: Welcome to Python.org​www.python.org 2.“温度转换”实例:摄氏度--->华氏度 / 华氏度--->摄氏度 TempConvert.py TempStr = input("请输入带有符号的温度值:") if TempStr[-1] in ["f","F"]:C = (eval(Tem…

mysql 修改root密码

1.找到配置文件my.ini ,然后将其打开,可以选择用记事本打开 C:\Program Files (x86)\MySQL\MySQL Server 5.0 2.打开后,搜索mysqld关键字,找到后,在mysqld下面添加skip-grant-tables,保存退出。 PS&#x…

联想计算机CDROM启动,联想电脑光驱启动问题?

1、开机按del键或f2进入bios设置(不同主板按键不一样,一般是DEL,也可能是F2,可以参考下主板说明),将计算机的启动模式调成从光盘启动。也就是从cdrom启动,根据主板的不同,bios设置有所差异(一般是&#xff…

没有J2EE容器的JNDI和JPA

我们希望通过尽可能简单的设置来测试一些JPA代码。 计划仅使用Java和Maven,不使用应用程序服务器或其他J2EE容器。 我们的JPA配置需要两件事才能成功运行: 数据库来存储数据, JNDI访问数据库。 这篇文章分为两个部分。 第一部分显示了如何…

string 大小写转换

STL的algorithm库确实给我们提供了这样的便利&#xff0c;使用模板函数transform可以轻松解决这个问题&#xff0c;开发人员只需要提供一个函数对象&#xff0c;例如将char转成大写的toupper函数或者小写的函数tolower函数。 transform原型&#xff1a; 1 #include <string&…

linux服务器上svn的log_如何在 Centos 8 / RHEL 8 上安装和配置 VNC 服务器 | Linux 中国...

在 Centos 8 和 RHEL 8 系统中&#xff0c;默认未安装 VNC 服务器&#xff0c;它需要手动安装。在本文中&#xff0c;我们将通过简单的分步指南&#xff0c;介绍如何在 Centos 8 / RHEL 8 上安装 VNC 服务器。-- Pradeep KumarVNC(虚拟网络计算Virtual Network Computing)服务器…

怎么把网页保存到本地计算机,在IE浏览器中,将网页保存到本地计算机中,若只需保存其中的文字、超链接和表格信息,应该选择的保存类型为( )...

2.(2017高一上东台月考)阅读下面一段资料&#xff0c;判断在给出的几种说法中不正确的是( )资料&#xff1a;IP电话与传统电话IP电话是按国际互联网协议规定的网络技术内容开通的电话业务&#xff0c;中文翻译为网络电话或互联网电话&#xff0c;它是利用国际互联网Inetrnet为…

html_博客博主

csdn: 工匠若水 http://blog.csdn.net/yanbober yunama: IT蓝豹&#xff1a;http://www.itlanbao.com/&#xff1b; http://ask.dcloud.net.cn/docs/; 博客园&#xff1a; https://www.cnblogs.com/guweiwei/category/965437.html转载于:https://www.cnblogs.com/awkflf11/p/55…

Windows上的Java线程CPU分析

本文将为您提供一个教程&#xff0c;介绍如何在Windows OS上快速查明Java线程贡献者与CPU严重问题有关。 Windows与Linux&#xff0c;Solaris和AIX等其他操作系统一样&#xff0c;使您可以在进程级别监视CPU利用率&#xff0c;还可以监视在进程中执行任务的单个线程。 在本教程…

flask 继承模版的基本使用1

转载于:https://www.cnblogs.com/wanghaonull/p/6399492.html

东芝2303am维护清零_东芝打印机2303A怎样清零

展开全部东芝e68a843231313335323631343130323136353331333365653137打印机是按照相关要求生产的正规产品&#xff0c;其清零方式与正规产品相同。因此此处将介绍常用的打印机清零方法。打印机清零一般分两种&#xff1a;一种是手工清零&#xff0c;另一种是软件清零。一、手工…