scala akka_使用Scala,Play和Akka连接到RabbitMQ(AMQP)

scala akka

在本文中,我们将研究如何从Scala连接到RabbitMQ,以便可以从应用程序中支持AMQP协议。 在此示例中,我将使用Play Framework 2.0作为容器(有关更多信息,请参阅我在该主题上的其他文章 )在其中运行应用程序,因为Play使得使用Scala进行开发变得容易得多。 本文还将使用Akka actor发送和接收RabbitMQ的消息。

什么是AMQP

首先,快速介绍AMQP。 AMQP代表“高级消息队列协议”,并且是消息传递的开放标准。 AMQP 主页将其愿景陈述为:“成为所有消息中间件之间互操作性的标准协议”。 AMQP定义了用于交换消息的传输级别协议,该协议可用于集成来自许多不同平台,语言和技术的应用程序。
有许多工具可以实现此协议,但是RabbitMQ越来越引起人们的关注。 RabbitMQ是使用AMQP的基于Erlang的开源消息代理。 会说AMQP的所有应用程序都可以连接并使用RabbitMQ。 因此,在本文中,我们将展示如何将基于Play2 / Scala / Akka的应用程序连接到RabbitMQ。
在本文中,我们将向您展示如何实现两种最常见的方案:

  • 发送/接收:我们将配置一个发件人每隔几秒钟发送一条消息,并使用两个侦听器以循环方式从队列中读取消息。
  • 发布/订阅:对于本示例,我们将创建几乎相同的场景,但是这次,侦听器将同时获得消息。

我假设您已经安装了RabbitMQ。 如果不是,请按照其网站上的说明进行操作。

设置基本的Play 2 / Scala项目

在此示例中,我创建了一个新的Play 2项目。 这样做很容易:

jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ_            _ _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/ play! 2.0-RC2, http://www.playframework.orgThe new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQWhat is the application name? 
> PlayAndRabbitMQWhich template do you want to use for this new application? 1 - Create a simple Scala application2 - Create a simple Java application3 - Create an empty project> 1OK, application PlayAndRabbitMQ is created.Have fun!

我曾经使用scala-ide插件在Eclipse上工作,所以我执行play eclipsify并将项目导入Eclipse。
我们需要做的下一步是建立正确的依赖关系。 Play为此使用sbt,并允许您从项目目录中的build.scala文件配置依赖项。 我们将添加的唯一依赖关系是RabbitMQ的Java客户端库。 即使Lift提供了一个基于Scala的AMQP库,但我发现直接使用RabbitMQ也是一样容易。 添加依赖项后,我的build.scala如下所示:

import sbt._
import Keys._
import PlayProject._object ApplicationBuild extends Build {val appName         = "PlayAndRabbitMQ"val appVersion      = "1.0-SNAPSHOT"val appDependencies = Seq("com.rabbitmq" % "amqp-client" % "2.8.1")val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings()
}

将RabbitMQ配置添加到配置文件

对于我们的示例,我们可以配置一些东西。 将消息发送到的队列,要使用的交换以及运行RabbitMQ的主机。 在实际情况下,我们将需要设置更多的配置选项,但是在这种情况下,我们只有这三个。 将以下内容添加到您的application.conf中,以便我们可以从我们的应用程序中引用它。

#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1

现在,我们可以使用ConfigFactory访问这些配置文件。 为了便于访问,请创建以下对象:

object Config {val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}

初始化与RabbitMQ的连接

在查看如何使用RabbitMQ发送和接收消息之前,我们还有一个要定义的对象。 要使用RabbitMQ,我们需要一个连接。 我们可以使用ConnectionFactory获得与服务器的连接。 查看javadocs以获取有关如何配置连接的更多信息。

object RabbitMQConnection {private val connection: Connection = null;/*** Return a connection if one doesn't exist. Else create* a new one*/def getConnection(): Connection = {connection match {case null => {val factory = new ConnectionFactory();factory.setHost(Config.RABBITMQ_HOST);factory.newConnection();}case _ => connection}}
}

应用程序启动时启动监听器

在查看RabbitMQ代码之前,我们还需要做一件事。 我们需要确保在应用程序启动时注册了消息侦听器,并且发件人开始发送。 播放2提供了
为此的GlobalSettings对象,您可以在应用程序启动时扩展该对象以执行代码。 对于我们的示例,我们将使用以下对象(请记住,该对象需要存储在默认名称空间中:

import play.api.mvc._
import play.api._
import rabbitmq.Senderobject Global extends GlobalSettings {override def onStart(app: Application) {Sender.startSending}
}

我们将在下面的部分中查看此Sender.startSending操作,该操作将初始化所有发送者和接收者。

设置发送和接收方案

让我们看一下Sender.startSending代码,该代码将设置一个将msg发送到特定队列的发送方。 为此,我们使用以下代码:

object Sender {def startSending = {// create the connectionval connection = RabbitMQConnection.getConnection();// create the channel we use to sendval sendingChannel = connection.createChannel();// make sure the queue exists we want to send tosendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new SendingActor(channel = sendingChannel, queue = Config.RABBITMQ_QUEUE))), "MSG to Queue");}
}class SendingActor(channel: Channel, queue: String) extends Actor {def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish("", queue, null, msg.getBytes());Logger.info(msg);}case _ => {}}
}

在此代码中,我们采取以下步骤:

  1. 使用工厂检索到RabbitMQ的连接
  2. 在此连接上创建一个通道,用于与RabbitMQ通信
  3. 使用通道创建队列(如果尚不存在)
  4. 安排Akka每秒向演员发送一条消息。

所有这些都应该非常简单。 唯一(有点)复杂的部分是调度部分。 此调度操作的作用是这样的。 我们告诉Akka安排要发送给演员的消息。 我们需要2秒钟的延迟才能被触发,并且我们想每秒重复一次这项工作。 应该用于此的actor是SendingActor,您也可以在此清单中看到。 该参与者需要访问通道以发送消息,并且该参与者还需要知道将接收到的消息发送到哪里。 这是队列。
因此,此Actor每秒将收到一条消息,附加一个时间戳,并使用提供的通道将此消息发送到队列:channel.basicPublish(“”,queue,null,msg.getBytes());。 现在我们每秒发送一条消息,在此队列上有可以接收消息的侦听器将是很好的。 为了接收消息,我们还创建了一个Actor,可以在特定队列上无限期地进行监听。

class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {// called on the initial rundef receive = {case _ => startReceving}def startReceving = {val consumer = new QueueingConsumer(channel);channel.basicConsume(queue, true, consumer);while (true) {// wait for the messageval delivery = consumer.nextDelivery();val msg = new String(delivery.getBody());// send the message to the provided callback function// and execute this in a subactorcontext.actorOf(Props(new Actor {def receive = {case some: String => f(some);}})) ! msg}}
}

这个actor比我们以前发送的actor要复杂一些。 当该参与者收到消息(消息的种类无关紧要)时,它将开始侦听创建该消息的队列。 它通过使用提供的通道创建使用者来实现此目的,并告诉使用者开始在指定队列上侦听。 Consumer.nextDelivery()方法将阻塞,直到在配置的队列中等待消息为止。 收到消息后,将创建一个新的Actor,将消息发送到该Actor。 这个新角色将消息传递到提供的方法,您可以在其中放置业务逻辑。
要使用此侦听器,我们需要提供以下参数:

  • 频道:允许访问RabbitMQ
  • 队列:监听消息的队列
  • f:收到消息后将执行的功能。

第一个示例的最后一步是将所有内容粘合在一起。 为此,我们向Sender.startSending方法添加了几个方法调用。

def startSending = {...val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);// create an actor that starts listening on the specified queue and passes the// received message to the provided callbackval callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);// setup the listener that sends to a specific queue using the SendingActorsetupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);...}private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");}

在此代码中,您可以看到我们定义了一个回调函数,并使用此回调函数以及队列和通道来创建ListeningActor。 我们使用scheduleOnce方法在单独的线程中启动此侦听器。 现在,使用此代码,我们可以运行应用程序(播放运行),打开localhost:9000来启动应用程序,我们应该看到类似以下输出的内容。

[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822

在这里,您可以清楚地看到循环处理消息的方式。

设置发布和订阅方案

一旦运行了上述代码,添加发布/订阅功能就变得非常简单。 现在我们使用PublishingActor代替SendingActor:

class PublishingActor(channel: Channel, exchange: String) extends Actor {/*** When we receive a message we sent it using the configured channel*/def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish(exchange, "", null, msg.getBytes());Logger.info(msg);}case _ => {}}
}

RabbitMQ使用交换来允许多个收件人接收相同的消息(以及许多其他高级功能)。 来自其他参与者的代码唯一的变化是,这次我们将消息发送到交换机而不是队列。 侦听器代码完全相同,我们唯一需要做的就是将队列连接到特定的交换机。 这样,该队列上的侦听器就可以接收发送到交换机的消息。 我们再次根据之前使用的设置方法执行此操作。

...// create a new sending channel on which we declare the exchangeval sendingChannel2 = connection.createChannel();sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");// define the two callbacks for our listenersval callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);// create a channel for the listener and setup the first listenerval listenChannel1 = connection.createChannel();setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback3);// create another channel for a listener and setup the second listenerval listenChannel2 = connection.createChannel();setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback4);// create an actor that is invoked every two seconds after a delay of// two seconds with the message "msg"Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new PublishingActor(channel = sendingChannel2, exchange = Config.RABBITMQ_EXCHANGEE))), "MSG to Exchange");...

我们还为setupListener创建了一个重载方法,该方法作为一个附加参数,也接受要使用的交换的名称。

private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {channel.queueBind(queueName, exchange, "");Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");}

在这小段代码中,您可以看到我们将提供的队列(在我们的示例中是一个随机名称)绑定到指定的交易所。 之后,我们将创建一个新的监听器,如我们之前所见。
现在运行此代码将产生以下输出:

[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006

如您所见,在这种情况下,两个侦听器都收到相同的消息。 这几乎涵盖了本文的全部内容。 如您所见,为RabbitMQ使用基于Java的客户端api绰绰有余,并且可以从Scala轻松使用。 请注意,尽管该示例尚未准备好投入生产,但您应注意关闭连接,并很好地关闭侦听器和参与者。 这里没有显示所有关闭代码。

参考:从Smart Java博客的JCG合作伙伴 Jos Dirksen 使用Scala,Play和Akka连接到RabbitMQ(AMQP) 。


翻译自: https://www.javacodegeeks.com/2012/04/connect-to-rabbitmq-amqp-using-scala.html

scala akka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/355960.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux中服务器之间的跳转,linux之***服务器间ip隧道跳转多ip路由走向分流(系真实案例)...

本文系统Centos6.0;这里的***服务以pptpd为例;其中底层涉及到pptpdfreeradiusmysql认证;本文系真实案例;leader需求大多是这样的,节约成本还要达到所需要的效果;没办法,总的做个效果出来&#x…

POJ2503 Babelfish map或者hash_map

POJ2503 这是一道水题&#xff0c;用Map轻松AC。 不过&#xff0c;可以拿来测一下字符串散列&#xff0c; 毕竟&#xff0c;很多情况下map无法解决的映射问题需要用到字符串散列。 自己生成一个质数&#xff0c; 随便搞一下。 #include<iostream> #include<cstdio>…

使用JBoss EAP 7的HTTP / 2

就在几天前&#xff0c;新的JBoss EAP 7 ALPHA版本已经发布。 而且我已经写过关于如何开始使用它的博客。 一个非常有趣的部分是HTTP / 2支持&#xff0c;它已作为技术预览添加。 它由新的Web服务器Untertow提供。 HTTP / 2通过压缩头并在同一TCP连接上多路复用多个流来减少延迟…

linux树莓派 ssh密码,树莓派之SSH连接经验

打开SSH服务执行raspi-configsudo raspi-config选择InterfacingOptions选项&#xff0c;回车选择SSH&#xff0c;回车Windows下客户端推荐使用putty这个免费开源的SSH客户端。下载地址在红框处填入树莓派的IP地址&#xff0c;点击右下角的Open即可。不过&#xff0c;putty的默认…

linux嵌入式面试题合集,嵌入式linux面试题解析(一)——ARM部分二

嵌入式linux面试题解析(一)——ARM部分二1、描述一下嵌入式基于ROM的运行方式基于RAM的运行方式有什么区别。基于RAM的运行方式&#xff1a;需要把硬盘和其他介质的代码先加载到ram中&#xff0c;加载过程中一般有重定位的操作&#xff1b;基于ROM&#xff1a;没有上面的操作。…

Unity应用架构设计(1)—— MVVM 模式的设计和实施(Part 2)

MVVM回顾 经过上一篇文章的介绍&#xff0c;相信你对MVVM的设计思想有所了解。MVVM的核心思想就是解耦&#xff0c;View与ViewModel应该感受不到彼此的存在。 View只关心怎样渲染&#xff0c;而ViewModel只关心怎么处理逻辑&#xff0c;整个架构由数据进行驱动。不仅View与View…

apache camel_REST端点,可使用Apache Camel进行集成

apache camelREST是一种用于组织资源的体系结构样式&#xff0c;当应用于基于HTTP的服务时&#xff0c;REST可以构建无状态&#xff0c;分离的可伸缩服务。 HTTP方法&#xff0c;HTTP标头和mime类型都允许开发人员实现REST样式。 诸如Jersey和Fuse Services Framework&#xff…

Linux读取SSD的smart信息,使用smartmontools查看SSD的“秘密”信息

仍然担心看不到sm841中的温度吗&#xff1f;您是否仍对Toshiba Q pro看不见写入量和使用寿命值感到不安&#xff1f;为了查看M4 / 00的写入量&#xff0c;您是否仍在使用C300固件升级程序重新启动到纯DOS并麻烦地运行命令&#xff1f;想知道在协议级别上SSD出了什么问题吗&…

Linux查看系统信息的一些命令及查看已安装软件包的命令

系统 # uname -a # 查看内核/操作系统/CPU信息 # head -n 1 /etc/issue # 查看操作系统版本 # cat /proc/cpuinfo # 查看CPU信息 # hostname # 查看计算机名 # lspci -tv # 列出所有PCI设备 # lsusb -tv # 列出所…

5条Java记录规则

日志记录是一个关键因素&#xff0c;在软件开发过程中应始终将其考虑在内。 当生产中发生不良情况时&#xff0c;日志文件通常是我们进行故障分析的起点。 而且&#xff0c;通常&#xff0c;它们是我们掌握的唯一信息&#xff0c;可以了解发生了什么以及问题的根本原因。 正确…

linux 离线安装中文字库,centos7 离线安装字体fontconfig

起因&#xff1a;最近做了个flowable然而linux下乱码&#xff0c;发现需要安装字体包在线&#xff1a;直接 yum -y install fontconfig&#xff1b;yum -y install ttmkfdir&#xff1b;配置下即可。拓展&#xff1a;离线所需rpm包如何获取&#xff1f;百度不好找&#xff0c;找…

锁定机制和数据并发管理(笔记)

共享锁和排它锁 排它锁&#xff1a;当某一个会话正在更新某一行&#xff0c;为了防止其他会话修改这一行&#xff0c;这行会被锁定这种锁称为排他锁。被排他锁锁定的行仍然可以被其他会话读取。 共享锁&#xff1a;在一个表上放置共享锁的目的是为了防止其他会话获得这个表上的…

linux防火墙配置说明,Linux防火墙配置命令参数说明

规则操作参数说明&#xff1a;-A&#xff1a;在所选择的链末添加一条或更多规则&#xff1b;-D&#xff1a;从所选链中删除一条或更多规则。有两种方法&#xff1a;把被删除规则指定为链中的序号(第一条序号为1)&#xff0c;或者指定为要匹配的规则&#xff1b;-R&#xff1a;从…

【react.js + hooks】useGuide 创建用户引导视图

有的时候用户可能对网站上的一些操作流程感到困惑&#xff0c;这时候我们需要为用户创建引导视图。为了插入指引而专门去更改组件的渲染函数&#xff0c;显然是不合逻辑的&#xff0c;创建指引视图应该是一种对源代码低侵入的行为&#xff0c;我们可以遵循某一套约定&#xff0…

使用递归算法结合数据库解析成java树形结构

使用递归算法结合数据库解析成java树形结构 1、准备表结构及对应的表数据a、表结构&#xff1a; create table TB_TREE ( CID NUMBER not null, CNAME VARCHAR2(50), PID NUMBER //父节点 ) b、表数据&#xff1a; insert into tb_tree (CID, CNAME, PID) values (1, 中国, 0);…

ug11 linux,UG11.0升级包MP02Win#Linux系统下载就上UG网

UG11.0软件又出升级包啦&#xff01;抽空可以为NX升级啦&#xff0c;从11.0发布到现在&#xff0c;近三个月了&#xff0c;每一次版本的更新&#xff0c;都会带来较多功能的改善&#xff0c;以及对错误BUG的有效处理&#xff0c;下图为升级后的UG11.0软件&#xff1b;UG11.0升级…

akka 异常处理_使用Akka处理1000万条消息

akka 异常处理Akka演员承诺并发。 有什么更好的模拟方法&#xff0c;看看使用商品硬件和软件处理1000万条消息需要花费多少时间&#xff0c;而无需进行任何低级调整。我用Java编写了整个1000万条消息的处理过程&#xff0c;整个结果令我惊讶。 当我在具有i5 – 4核心&#xff0…

20155330 2016-2017-2 《Java程序设计》第五周学习总结

20155330 2016-2017-2 《Java程序设计》第五周学习总结 教材学习内容总结 学习目标 理解异常架构掌握try...catch...finally处理异常的方法会用throw,throws理解Collection和Map架构会用常见的数据结构和算法了解Lambada和泛型第八章 章节主要内容 小结 Throwwable定义了取错误…

yum安装odbc驱动linux,在CentOS上离线配置PostgreSQL ODBC数据源

一、问题提出内网的一台CentOS服务器&#xff0c;需配置PostgreSQL ODBC。如果可以连接Internet&#xff0c;此工作很容易&#xff0c;使用yum install自动安装相应依赖包后简单配置即可。但当置于内网环境时&#xff0c;事情就有些麻烦&#xff0c;需要事先手工下载各个依赖包…

树形dp小胖守皇宫(vijosP1144)

题目链接&#xff1a;https://vijos.org/p/1144 题解&#xff1a;这道题的动归稍稍有一点的复杂&#xff0c;因为一个节点有可能被它的子节点观察&#xff0c;也有可能被父节点观察&#xff1b; 所以我们这样表示&#xff1a; f[i][0]&#xff08;表示当前i节点放了一个看守&am…