ElasticMQ是一台消息服务器,具有Scala,Java和与Amazon SQS兼容的接口。 它通过跨服务器群集复制消息来支持有保证的消息传递,并通过日志记录实现消息持久性。
消息复制是ElasticMQ的核心功能之一。 但是,如果您看一下代码,则只有少数几个类,最长的类有76行(请记住,这是Scala,虽然;))。 这是因为ElasticMQ使用JGroups作为基础通信库。 JGroups已经很老了,特别是对于Java库而言-JGroups的第一个发行版是在1999年(!)。 但是,它远不是过时和过时的-它具有一个不错的API,可以正常工作,拥有一个良好的社区。 并且因为任何Java库都可以与Scala很好地协作。
JGroups具有许多有用的功能:
- 可靠的组播
- 集群管理
- 故障检测
- 节点发现
- 多年的性能改进
它们广泛用于在ElasticMQ中实施复制。 以下是其完成方式的摘要。
ElasticMQ集群如何工作?
在单个ElasticMQ集群中,一个节点始终是主节点。 您只能对此节点执行操作。 然后将每个操作的结果复制到其他成员。 有两种与阻止有关的选项; 复制可以是完全异步的,也可以等待直到至少一个或所有节点确认该操作。 为了确保在群集分区的情况下不会从不同分区收到相同的消息,只有具有至少一半+1节点操作的分区处于活动状态。
ElasticMQ中的中心概念是消息存储。 存储器执行命令(例如,发送消息命令,删除消息命令等)。 复制层只是任何其他存储的包装。 但是请注意,我们只能复制产生的存储突变(因此在执行命令之后),而不是原始命令本身。 例如,如果命令是“接收消息”,则在每台计算机上执行该命令的结果可能会有所不同。 因此,如果接收消息成功,我们将仅复制消息可见性的更改(在ElasticMQ中,类似于Amazon SQS ,如果接收到消息,则会在指定的时间段内阻止后续接收该消息)。 您可以在JGroupsReplicatedStorage中看到此基本逻辑。
初始化集群
在开始复制之前,首先要做的是初始化集群。 这是在ReplicatedStorageConfigurator中完成的。 作为参数,我们需要一个JGroups配置文件,该文件是协议栈。 您实际上并不需要知道每种协议的功能以及所有这些配置参数的含义。 最有用的两个是udp.xml和tcp.xml 。 如果您的网络中有多播,则应使用第一个;如果所有通信都应通过TCP(例如,在EC2上),则应使用第二个。 在后一种情况下,您还需要提供初始IP列表。 该列表不必详尽无遗,只需列出种子即可。
拥有协议栈,ElasticMQ创建一个JChannel并将其连接,这仅意味着连接到集群。 实际上,这就是使用JGroups创建集群所需要做的所有工作-非常简单,对吧? 正如您在ReplicatedStorageConfigurator的末尾看到的那样,连接之后的第一件事是对channel.getState(null,0)的调用。 这将转到当前的主节点(稍后会进行有关主选举的更多信息),获取状态(当前的队列和消息)并将其应用于当前的节点(请参阅非常简单的JGroupsStateTransferMessageListener-处理发送和接收)。 这里有两件事要注意。 首先,此传输不会阻止整个群集正常运行。 其次,如果在状态转移期间执行了一项操作,则该操作也会被复制。 因此,可能会在新节点上执行一次命令两次。 但这无关紧要,因为每个复制的命令都是幂等的,因此可以多次应用。 在其他情况下,必须实施某些应用程序侧机制以防止此类情况。
复制数据
最后,我们进入核心:复制命令。 在发送方,这由JGroupsReplicationMessageSender处理。 同样,这不是一个非常复杂的类。 它使用来自JGroups的MessageDispatcher “构建块”,除了在整个集群中对消息进行多重处理外,还使您能够等待,直到指定数量的节点接收到它为止。 在接收方,我们有JGroupsRequestHandler 。 同样,非常简单。 收到消息后,它仅发送到存储。
集群管理
您可能还注意到SetMaster特殊消息。 用户需要此权限才能读取当前主节点的节点地址。 主选举(决定哪个节点是主节点)完全由JGroups处理。 JGroups中没有特定的算法来选举主节点,但是我们可以利用以下事实:每个节点都有相同的集群视图,由JGroups View类表示。 我们要做的就是简单地从列表中获取第一个(或最后一个或第3个,等等-只要在所有节点上都相同),然后将其设置为主节点即可。
群集视图由最后一个“核心”复制类JGroupsMembershipListener处理 。 那里发生了两件事。 每当新节点加入或离开集群时,都会调用viewAccepted回调。 每个具有View类的实例(很好,等于:))的节点。 主机在单独的线程中广播其地址(这是ElasticMQ服务器地址,而不是内部JGroups集群通信地址)。 在一个JGroups回调方法中执行阻塞操作是一个非常容易的错误。 您永远不应该那样做,因为整个堆栈都可以锁定。 我们还需要FLUSH协议(总是在集群设置过程中添加); 该协议可确保在所有节点都安装新视图之前,不发送新消息,因此,我们确保新节点始终接收主信息。
成员资格侦听器还处理集群合并。 同样,JGroups为我们提供了合并分区的视图以及新的合并视图。 在ElasticMQ中,除了主分区(最大分区)以外的所有分区都请求状态转移,就像连接到集群之后一样。 这样,数据将保持一致状态。
加起来
还值得注意的是,使用ScalaTest对ElasticMQ的复制进行了全面测试。 每个测试都会创建一个内存存储集群,创建新节点或模拟节点崩溃。 请参见JGroupsReplicatedStorageTest类。
有了JGroups的机制,就可以轻松实现集群通信。 但是,与往常一样,您需要记住一些有关并发的陷阱(例如,新节点加入时可能会有集群活动;分区和合并可能随时发生;正常消息和集群视图更改之间没有顺序) ;可以在状态转移期间发送消息;等等。 但是,JGroups 教程和手册都非常全面,并且得到了论坛的其他帮助(感谢Bela!),您应该一切顺利。
您可以通过下载独立的ElasticMQ 发行版或以嵌入式方式运行它,来尝试复制在实践中的工作方式。
参考:来自Adam Warski博客的Blog的 JCG合作伙伴 Adam Warski使用JGroups在ElasticMQ中实现消息复制 。
翻译自: https://www.javacodegeeks.com/2012/06/elasticmq-message-replication-with.html