Aeron:两个代理之间的单向IPC(One-way IPC between two agents)

一、概述

本例展示了如何通过 IPC 在调度于不同线程的两个代理之间传输缓冲区。在继续学习本示例之前,最好先复习一下Simplest Full Example ,因为该示例展示的是 IPC 通信,没有增加代理的复杂性。读者还应熟悉Media Driver

流程构建如下:

  • 以默认模式运行的嵌入式Media Driver(发送器、接收器和指挥器的代理(an agent for Sender, Receiver, Conductor))(an embedded media driver running in default mode (an agent for Sender, Receiver, Conductor))
  • 通过publication发送 IPC 数据的代理(SendAgent)(an agent to send the IPC data over a publication (SendAgent))
  • 通过subscription接收 IPC 数据的代理(ReceiveAgent)(an agent to receive the IPC data over a subscription (ReceiveAgent))

Code Sample overview

代码示例包含在 ipc-core 项目 com.aeroncookbook.ipc.agents 命名空间中的三个文件中。它们是:

  • StartHere.java - the class responsible for setting up Aeron and scheduling the agents;(负责设置 Aeron 和调度代理的类)
  • SendAgent.java - the class holding the Agent responsible for sending data;(负责发送数据的代理的类)
  • ReceiveAgent.java - the class holding the Agent responsible for receiving data.(负责接收数据的代理的类)

下文将对每个部分进行细分和讨论。

 Execution Output

15:13:42.814 [main] starting
15:13:42.964 [receiver] received: 1000000

二、StartHere.java

public static void main(String[] args)
{final String channel = "aeron:ipc";final int stream = 10;final int sendCount = 1_000_000;final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();//construct Media Driver, cleaning up media driver folder on start/stopfinal MediaDriver.Context mediaDriverCtx = new MediaDriver.Context().dirDeleteOnStart(true).threadingMode(ThreadingMode.SHARED).sharedIdleStrategy(new BusySpinIdleStrategy()).dirDeleteOnShutdown(true);final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);//construct Aeron, pointing at the media driver's folderfinal Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName());final Aeron aeron = Aeron.connect(aeronCtx);//construct the subs and pubsfinal Subscription subscription = aeron.addSubscription(channel, stream);final Publication publication = aeron.addPublication(channel, stream);//construct the agentsfinal SendAgent sendAgent = new SendAgent(publication, sendCount);final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier,sendCount);//construct agent runnersfinal AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,Throwable::printStackTrace, null, sendAgent);final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,Throwable::printStackTrace, null, receiveAgent);LOGGER.info("starting");//start the runnersAgentRunner.startOnThread(sendAgentRunner);AgentRunner.startOnThread(receiveAgentRunner);//wait for the final item to be received before closingbarrier.await();//close the resourcesreceiveAgentRunner.close();sendAgentRunner.close();aeron.close();mediaDriver.close();
}

 Constructing support objects

final String channel = "aeron:ipc";
final int stream = 10;
final int sendCount = 1000;
final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

这部分代码构建了一些支持对象。(This section of the code constructs a few support objects.)

  • Line 1 holds the channel definition, in this case aeron:ipc
  • Line 2 holds the stream ID to use, in this case 10
  • Line 3 is the number of integers to send over IPC
  • 第 4、5 行构建了代理使用的空闲策略(IdleStrategy)。在这种情况下,只要 doWork 工作周期返回 0,空闲策略就会忙于旋转。(Line 4,5 constructs the IdleStrategy to be used by the agents. In this case, whenever the doWork duty cycle returns 0, the idle strategy will busy spin.)
  • 第 6 行是一个屏障,用于协调样本的关闭。一旦 ReceiveAgent 总共接收到一个发送计数整数,它就会向屏障发出信号,触发关闭。(Line 6 is a barrier that will be used to co-ordinate a shutdown of the sample. Once the ReceiveAgent has received a total of sendCount integers, it will signal the barrier, triggering the shutdown.)

Constructing the Media Driver 

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context().dirDeleteOnStart(true).threadingMode(ThreadingMode.SHARED).sharedIdleStrategy(new BusySpinIdleStrategy()).dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

本节代码使用定义的上下文构建Media Driver。上下文是一个对象,其中包含Media Driver的所有可选配置参数。在本例中,有两项配置被重写,以确保Media Driver在启动和关闭时整理Media Driver目录。一旦上下文准备就绪,Media Driver就会作为嵌入式代理启动。

See also: Media Driver

 Constructing Aeron, the Publication and the Subscription

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

这部分代码再次使用 Context 构建 Aeron 对象。有了这个上下文,我们就能让 Aeron 知道Media Driver的 Aeron 目录在哪里。一旦上下文准备就绪,Aeron 对象就会连接到Media Driver。接下来,我们将使用之前定义的通道和流 id 创建 IPC 发布和订阅(IPC publication and subscription)。

//construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, stream);
final Publication publication = aeron.addPublication(channel, stream);

Constructing and scheduling the agents

//construct the agents
final SendAgent sendAgent = new SendAgent(publication, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier, sendCount);//construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,Throwable::printStackTrace, null, sendAgent);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,Throwable::printStackTrace, null, receiveAgent);//start the runners
AgentRunner.startOnThread(sendAgentRunner);
AgentRunner.startOnThread(receiveAgentRunner);

 

这部分代码构建发送代理(SendAgent)和接收代理(ReceiveAgent),创建代理运行程序来管理它们,然后在特定线程上启动它们。关键行如下:

  • 第 6-7 行和第 8-9 行:这两行分别构建了发送和接收的代理运行程序。请注意,每行都给出了空闲策略,用于控制线程在 doWork 工作周期后如何使用资源。
  • 第 12 和 13 行:这两行为每个代理创建新线程,并开始工作周期。

Shutting down cleanly

//wait for the final item to be received before closing
barrier.await();//close the resources
receiveAgentRunner.close();
sendAgentRunner.close();
aeron.close();
mediaDriver.close();

代码的最后部分负责等待 ReceiveAgent 触发屏障,然后正确清理资源。首先关闭代理,然后关闭 aeron 对象,最后关闭Media Driver。如果不注意关闭过程中的执行顺序,可能会出现核心转储或其他看似严重的故障。 

三、SendAgent.java

public class SendAgent implements Agent
{private final Publication publication;private final int sendCount;private final UnsafeBuffer unsafeBuffer;private int currentCountItem = 1;private final Logger logger = LoggerFactory.getLogger(SendAgent.class);public SendAgent(final Publication publication, int sendCount){this.publication = publication;this.sendCount = sendCount;this.unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(64));unsafeBuffer.putInt(0, currentCountItem);}@Overridepublic int doWork(){if (currentCountItem > sendCount){return 0;}if (publication.isConnected()){if (publication.offer(unsafeBuffer) > 0){currentCountItem += 1;unsafeBuffer.putInt(0, currentCountItem);}}return 0;}@Overridepublic String roleName(){return "sender";}
}

 

send 对象负责通过提供的 Aeron Publication 发送 sendCount 整数。doWork 方法用于保持代理的工作周期,该方法会被持续调用,直至代理关闭。一旦达到 sendCount 限制,它就会停止向publication发送更多信息,并开始闲置。

这段代码中最有趣的部分是:

  • Line 18 to 34: the doWork method holding the duty cycle for this agent
  • 第 22 行和第 34 行:这两条返回语句都返回 0,这将导致选定的空闲策略 BusySpinIdleStrategy 调用 ThreadHints.onSpinWait()
  • 第 25 行:只有当publication已连接时,才会返回 true。一旦连接,就可以安全地向publication提供信息。
  • 第 27 行:这将为publication提供缓冲数据。
  • Line 30: this logs the last sent integer, for example 15:13:42.818 [sender] sent: 123
  • Line 41: this sets the thread name to sender, as is visible in the log output.

四、ReceiveAgent.java

public class ReceiveAgent implements Agent
{private final Subscription subscription;private final ShutdownSignalBarrier barrier;private final int sendCount;private final Logger logger = LoggerFactory.getLogger(ReceiveAgent.class);public ReceiveAgent(final Subscription subscription,ShutdownSignalBarrier barrier, int sendCount){this.subscription = subscription;this.barrier = barrier;this.sendCount = sendCount;}@Overridepublic int doWork() throws Exception{subscription.poll(this::handler, 1000);return 0;}private void handler(DirectBuffer buffer, int offset, int length,Header header){final int lastValue = buffer.getInt(offset);if (lastValue >= sendCount){logger.info("received: {}", lastValue);barrier.signal();}}@Overridepublic String roleName(){return "receiver";}
}

接收代理负责轮询所提供的订阅并记录接收到的值。一旦达到 sendCount 值,接收代理就会发出屏障信号。该对象中最有趣的部分是:

  • 第 17-21 行 - doWork 方法保持着该代理的duty cycle 。duty cycle由两部分组成,一部分是轮询订阅,将事件传递给提供的处理程序,另一部分是返回 0。通过配置的 IdleStrategy,返回 0 将导致线程停顿一微秒。
  • Line 26 - this logs the integer value received, for example: 15:13:42.814 [receiver] received: 5
  • Lines 29-32 - this signals the barrier, triggering the clean shutdown of the process.
  • Line 38 - this sets the role name to receiver, as visible in log output.

 五、Performance

在英特尔笔记本电脑上,本示例每秒可传输约 1 千万条 4 字节信息。如果使用的是 Linux 系统,且有可用的 /dev/shm,代码会自动使用。通过交换 NoOpIdleStrategy,并将media driver线程移至 DEDICATED,每秒可传输超过 2000 万条信息。主要更改见下文。请注意,您需要确保硬件上至少有 8 个物理内核。

final IdleStrategy idleStrategySend = new NoOpIdleStrategy();
final IdleStrategy idleStrategyReceive = new NoOpIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context().dirDeleteOnStart(true).threadingMode(ThreadingMode.DEDICATED).conductorIdleStrategy(new BusySpinIdleStrategy()).senderIdleStrategy(new NoOpIdleStrategy()).receiverIdleStrategy(new NoOpIdleStrategy()).dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context().idleStrategy(new NoOpIdleStrategy()).aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

 

有一个相关的 Two Agent example of OneToOneRingBuffer 非常相似,只不过它使用了 Agrona 的 OneToOneRingBuffer,并通过 BusySpinIdleStrategy 每秒发送大约 1800 万条 4 字节信息,或通过 NoOpIdleStrategy 每秒发送超过 5000 万条信息。

六、Using the C Media Driver

要使用 Aeron 的 C Media Driver测试此示例,您需要执行以下操作:

首先,从源代码中构建 C Media Driver(说明因操作系统而异,此部分参考博客即可,建议翻墙进行操作,贴出cookbook只是帮助借阅):

  • Building the C Media Driver on macOS
  • Building the C Media Driver on CentOS Linux 8
  • Building the C Media Driver on Ubuntu 20.04
  • Building the C Media Driver on Windows 10

Next, start the C Media Driver with default settings

  • ./aeronmd (Linux/macOS)
  • aeronmd (Windows)

Then, remove the Media Driver from StartHere.java, and reduce the Aeron context to defaults:

//construct Media Driver, cleaning up media driver folder on start/stop
//final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
//        .dirDeleteOnStart(true)
//        .threadingMode(ThreadingMode.SHARED)
//        .sharedIdleStrategy(new BusySpinIdleStrategy())
//        .dirDeleteOnShutdown(true);
//final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context();
final Aeron aeron = Aeron.connect(aeronCtx);

Aeron 和Media Driver将默认使用同一目录。

最后,正常运行 StartHere.java。进程应正常运行,输出应包括类似内容:

14:30:00.293 [main] starting
14:30:00.758 [receiver] received: 10000000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/28898.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习笔记——路由网络基础——路由优先级(preference)

1、路由优先级(preference) 路由优先级(preference)代表路由的优先程度。当路由器从多种不同的途径获知到达同一个目的网段的路由(这些路由的目的网络地址及网络掩码均相同)时,路由器会比较这些路由的优先级,优选优先级值最小的路由。 路由来源的优先…

编程之道:程序员必备的五大职业素养

引言 在数字化时代,程序员的角色变得日益重要。他们不仅是代码的编写者,更是技术变革的推动者。然而,成为一名优秀的程序员,除了技术能力之外,还需要具备一系列职业素养。本文将探讨程序员在职业生涯中应具备的五大职业…

pycharm git配置

PyCharm 是一个强大的集成开发环境(IDE),它内置了 Git 集成,使得版本控制变得非常方便。以下是 PyCharm 中配置 Git 的基本步骤: 安装Git: 在开始之前,请确保已经在您的系统上安装了 Git。您可以通过官方网站下载并安装 Git。本系统用的是Git-2.29.2.2-64-bit 。 打开PyC…

24年下半年安徽教资认定准确时间和流程

安徽教资认定准确时间 网上报名时间: 第一批次:4月8日至4月19日17时 第二批次:6月17日至6月28日17时 注意:符合安徽省申请条件的普通大中专院校2024届全日制毕业生,应统一选择6月17日至6月28日17时的时间段进行网上报名…

vivado在implementation时出现错误[Place 30-494] The design is empty的一个可能原因和解决方法

在查询类似帖子时我发现这一问题是由于在设计实现时vivado认为没有输出端口所以报错。 于是在.v文件中我添加了一个随意的端口,并且在.xdc文件中为它分配了管脚 这样做的确可以让设计实现的过程顺利进行,但是会发现在summary中,设计实现的…

2024年6月17日 (周一) 叶子游戏新闻

期刊杂志: 聚合读者、意林、知音、故事会、花火以及国内各大知名报纸电子版,无需付费即可观看各种免费资源 中医自学宝典: 集合了中医医案,医经,方剂 药材知识的app,更方便的免费学习中医知识 《最终幻想7》重制三部曲总监鸟山求&…

每日复盘-202406017

今日关注: 20240617 六日涨幅最大: ------1--------301036--------- 双乐股份 五日涨幅最大: ------1--------301176--------- 逸豪新材 四日涨幅最大: ------1--------300868--------- 杰美特 三日涨幅最大: ------1--------301082--------- 久盛电气 二日涨幅最大…

springboot+vue+mybatis酒店房间管理系统+PPT+论文+讲解+售后

随着现在网络的快速发展,网络的应用在各行各业当中它很快融入到了许多商家的眼球之中,他们利用网络来做这个电商的服务,随之就产生了“酒店房间管理系统”,这样就让人们酒店房间管理系统更加方便简单。 对于本酒店房间管理系统的…

BRAVE:扩展视觉编码能力,推动视觉-语言模型发展

视觉-语言模型(VLMs)在理解和生成涉及视觉与文本的任务上取得了显著进展,它们在理解和生成结合视觉与文本信息的任务中扮演着重要角色。然而,这些模型的性能往往受限于其视觉编码器的能力。例如,现有的一些模型可能对某…

车企高管组团“出道”,汽车营销已经Next level了?

汽车进入了“卷”老板、“卷”高管的时代! 谁能想到,雷军凭一己之力,在一定程度上重塑了汽车的竞争策略。价格战之外,车市又开启了流量之战。 云略曾在《雷军20天吸粉500w!……》一文中,提到继雷军之后&…

爆肝整理AI Agent:在企业应用中的6种基础类型

AI Agent智能体在企业应用中落地的价值、场景、成熟度做了分析,并且探讨了未来企业IT基础设施与架构如何为未来Gen AI(生成式AI)做好准备。在这样的架构中,我们把最终体现上层应用能力的AI Agent从不同的技术要求与原理上分成了几…

【MySQL】分库分表

https://www.bilibili.com/video/BV1Kr4y1i7ru/?p163​ https://blog.csdn.net/qq_47959003/article/details/126058710 随着互联网及移动互联网的发展,应用系统的数据量也是成指数式增长,若采用单数据库进行数据存储,存在以下性能瓶颈&…

Java项目:基于SSM框架实现的汽车养护保养管理系统【ssm+B/S架构+源码+数据库+开题+毕业论文+任务书】

一、项目简介 本项目是一套基于SSM框架实现的汽车养护保养管理系统 包含:项目源码、数据库脚本等,该项目附带全部源码可作为毕设使用。 项目都经过严格调试,eclipse或者idea 确保可以运行! 该系统功能完善、界面美观、操作简单、…

宋街宣传活动-循环利用,绿色生活

善于善行回收团队是一支致力于推动环保事业,积极倡导和实践绿色生活的志愿者队伍。我们的宗旨是通过回收再利用,减少资源浪费,降低环境污染,同时提高公众的环保意识,共同构建美丽和谐的家园。 善于善行志愿团队于2024年…

C语言数据存储大小端问题

大小端 什么是大小端 大端模式(Big-endian),是指数据的高字节,保存在内存的低地址中,而数据的低字节,保存在内存的高地址中; 小端模式(Little-endian),是指数据的高字…

函数递归

哈喽啊各位,真是,好久好久好久不见。这段时间实在是太过忙碌了昂,还望诸君见谅,接下来时间会松很多,咱们也会恢复正常更新速度啦 小希在这里祝诸君:期末不挂科,四六级都过!功不唐捐…

解密:不用import,Python编程将遭遇什么?

在Python中,import 语句用于导入其他模块或库,如果不使用 import,会导致以下问题: 无法使用外部库或模块: Python标准库以及第三方库提供了丰富的功能和工具,如果不导入这些库,就无法使用它们提供的功能。 代码可读性降低: import 语句可…

新质生产力水平测算与中国经济增长新动能(dta数据及do代码)

时间跨度:2012-2022年 数据范围:全国30个省份(不含港澳台、西藏) 数据指标: 参考韩文龙等的做法,收集了全部控制变量与稳定性检验所需变量。 类型 符号 变量 变量定义 被解释变量 GDP 各省人均GDP…

螺丝工厂vtk ThreadFactory(1)

螺丝工厂vtkThreadFactory (1) 缘起 几年前的探索在Python里应用Openscad实现3D建模之3D螺纹建模初探3 新的参考: generating nice threads in openscadvtkRotationalExtrusionFilter 辅助AI: coze 笔记📒: openscad 代码分析 // 半径缩放函数,用…

llamaindex原理与应用简介(宏观理解)

llamaindex原理与应用简介(宏观理解) 文章目录 llamaindex原理与应用简介(宏观理解) 这是我认为对于 llamaindex 应用的场景概述讲的相对比较好的视频:llamaindex原理与应用简介