大数据-Storm流式框架(五)---DRPC

DRPC

概念

分布式RPC(DRPC)背后的想法是使用Storm在运行中并行计算真正强大的函数。 Storm拓扑接收函数参数流作为输入,并为每个函数调用发送结果的输出流。

DRPC并不是Storm的一个特征,因为它基于Storm的spouts,bolts和拓扑的高级抽象。DRPC本可以打包成Storm独立的库,但是跟storm绑定在一起很有用。

顶层视角

分布式RPC由“DRPC服务器”协调(Storm随附实现)。 DRPC服务器协调接收RPC请求,将请求发送到Storm拓扑,从Storm拓扑接收结果,并将结果发送回等待的客户端。 从客户端的角度来看,分布式RPC调用看起来就像常规的RPC调用。 例如,以下是客户端如何使用参数“http://twitter.com”计算“到达”函数的结果:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

分布式RPC工作流程:

客户端向DRPC服务器发送要执行的函数名称以及该函数的参数。实现该功能的拓扑使用DRPCSpout从DRPC服务器接收函数调用流。 每个函数调用都由DRPC服务器标记唯一ID。 然后拓扑计算结果,在拓扑结束时,一个名为ReturnResults的bolt连接到DRPC服务器,并为其提供函数调用id的结果。 然后,DRPC服务器使用id来匹配客户端正在等待的结果,取消阻塞等待的客户端,并将结果发送给它。

LinearDRPCTopologyBuilder

Storm附带了一个名为LinearDRPCTopologyBuilder的拓扑构建器,它可以自动执行几乎所有涉及DRPC的步骤。 这些包括:

     1、设置spout

     2、将结果返回给DRPC服务器

     3、为bolt提供功能,以便在tuple(元组)组上进行有限聚合

我们来看一个简单的例子。 这是DRPC拓扑的实现,它返回带有“!”的输入参数。附:

public static class ExclaimBolt extends BaseBasicBolt {public void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}
}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);// ...
}

正如你所看到的,没有几行代码。 创建LinearDRPCTopologyBuilder时,可以告诉它拓扑的DRPC函数的名称。 单个DRPC服务器可以协调许多功能,函数名称可以区分各个函数。 声明的第一个bolt将2元组作为输入,其中第一个字段是请求ID,第二个字段是该请求的参数。 LinearDRPCTopologyBuilder期望最后一个bolt发出一个输出流,其中包含[id,result]形式的2元组。 最后,所有中间元组都必须包含请求ID作为第一个字段。

在这个例子中,ExclaimBolt只是附加一个“!” 到元组的第二个字段。 LinearDRPCTopologyBuilder处理连接到DRPC服务器并返回结果的其余协调。

本地模式的DRPC

DRPC可以在本地模式运行。下面的例子说明了如何运行本地模式的DRPC:

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));cluster.shutdown();
drpc.shutdown();

首先,创建一个LocalDRPC对象。 此对象模拟正在进行的DRPC服务器,就像LocalCluster在进程中模拟Storm集群一样。 然后创建LocalCluster以在本地模式下运行拓扑。 LinearDRPCTopologyBuilder具有用于创建本地拓扑和远程拓扑的单独方法。 在本地模式下,LocalDRPC对象不会绑定到任何端口,因此拓扑需要知道要与之通信的对象。 这就是createLocalTopology将LocalDRPC对象作为输入接收的原因。

启动拓扑后,您可以使用LocalDRPC上的execute方法执行DRPC调用。

远程模式的DRPC

在实际集群上使用DRPC也很简单。 有三个步骤:

     1、启动DRPC服务器

     2、配置DRPC服务器的位置

     3、将DRPC拓扑提交给Storm集群

启动DRPC服务器可以使用storm脚本完成,就像启动Nimbus或UI一样:

bin/storm drpc

接下来,您需要配置Storm群集以了解DRPC服务器的位置。 这就是DRPCSpout如何知道从何处读取函数调用。 这可以通过storm.yaml文件或拓扑配置来完成。 通过storm.yaml配置这个看起来像这样:

drpc.servers:- "drpc1.foo.com"- "drpc2.foo.com"

最后,像启动任何一个其他的拓扑一样,使用StormSubmitter启动DRPC拓扑。要在远程模式运行上述的示例,操作如下:

StormSubmitter
.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology用于为storm集群创建合适的拓扑。

稍微复杂的示例

感叹号DRPC示例是用于说明DRPC概念的玩具示例。让我们看一个更复杂的例子,它真正需要Storm集群为计算DRPC函数提供的并行性。我们将看到的示例是在Twitter上计算URL的范围。

URL的范围是在Twitter上暴露给URL的唯一人数。要计算覆盖面,您需要:

    1、获取推文网址的所有人

    2、获得所有这些人的所有粉丝

    3、独特的追随者

    4、统计一组独特的粉丝

在计算过程中,单个到达计算可能涉及数千个数据库调用和数千万个跟随者记录。这是一个非常非常密集的计算。正如您将要看到的那样,在Storm之上实现此功能非常简单。在一台计算机上,达到计算可能需要几分钟;在Storm集群中,您可以在几秒钟内计算最难的URL的覆盖率。

此处的storm-starter中定义了样本范围拓扑。以下是定义范围拓扑的方法:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2).fieldsGrouping(new Fields("id"));

拓扑执行为四个步骤:

    1、GetTweeters获取推文URL的用户。它将[id,url]的输入流转换为[id,tweeter]的输出流。每个url元组将映射到许多tweeter元组。

    2、GetFollowers获得推特的追随者。它将[id,tweeter]的输入流转换为[id,follower]的输出流。在所有任务中,当有人跟随多个发布相同URL的人时,可能会有重复的跟随元组。

    3、PartialUniquer通过关注者ID对关注者流进行分组。这具有相同的跟随者执行相同任务的效果。因此,PartialUniquer的每项任务都将获得相互独立的追随者。一旦PartialUniquer收到针对请求ID的所有针对它的关注元组,它就会发出其关注者子集的唯一计数。

4、最后,CountAggregator接收来自每个PartialUniquer任务的部分计数,并将它们相加以完成到达计算。

PartialUniquer代码:

public class PartialUniquer extends BaseBatchBolt {BatchOutputCollector _collector;Object _id;Set<String> _followers = new HashSet<String>();@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_followers.add(tuple.getString(1));}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _followers.size()));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "partial-count"));}
}

PartialUniquer通过扩展BaseBatchBolt实现IBatchBolt。批处理bolt提供了一个第一类API来处理一批元组作为具体单元。为每个请求ID创建一个新的批处理bolt实例,Storm会在适当的时候负责清理实例。

当PartialUniquer在execute方法中收到一个跟随元组时,它会将它添加到内部HashSet中的请求ID的集合中。

批处理bolt提供finishBatch方法,该方法在处理了针对此任务的此批处理的所有元组之后调用。在回调中,PartialUniquer会发出一个元组,其中包含其跟随者id子集的唯一计数。

在底层,CoordinatedBolt用于检测给定的bolt何时收到任何给定请求ID的所有元组。 CoordinatedBolt利用直接流来管理这种协调。

拓扑的其余部分应该是不言自明的。如您所见,到达计算的每一步都是并行完成的,定义DRPC拓扑非常简单。

非线性DRPC拓扑

LinearDRPCTopologyBuilder仅处理“线性”DRPC拓扑,其中计算表示为一系列步骤(如覆盖范围)。 不难想象函数需要更复杂的拓扑结构,包括bolt的分支和合并。 现在,要做到这一点,你需要直接使用CoordinatedBolt。 请务必在邮件列表中讨论非线性DRPC拓扑的用例,以便为DRPC拓扑构建更一般的抽象。

LinearDRPCTopologyBuilder工作流程:

DRPCSpout发射[args, return-info]。return-info是DRPC服务器的主机名和端口号,以及DRPC服务器生成的id。

创建一个拓扑包括:

  1. DRPCSpout
  2. PrepareRequest(生成请求ID,为返回信息创建一个流,为参数创建一个流)
  3. CoordinatedBolt
  4. JoinResult(使用return info合并结果)
  5. ReturnResult(连接DRPC服务器以及返回结果)

LinearDRPCTopologyBuilder是在storm原语之上构建高级别抽象的一个很好的例子。

进阶

KeyedFairBolt用于编织多个同时请求的处理

如何直接使用CoordinatedBolt

DRPC (Distributed RPC)  remote procedure call

分布式远程过程调用

DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。

DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。

(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)

DRPC设计目的:

为了充分利用Storm的计算能力实现高密度的并行实时计算。

(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

定义DRPC拓扑

方法1:

通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)

该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现

方法2:

直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑

需要手动设定好开始的DRPCSpout以及结束的ReturnResults

运行模式:

1、本地模式

2、远程模式(集群模式)

修改配置文件conf/storm.yaml

drpc.servers:

    - "node1“

启动DRPC Server

bin/storm drpc &

通过StormSubmitter.submitTopology提交拓扑

案例:

Twitter 中某个URL的受众人数统计(这篇twitter到底有多少人看到过)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/123284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ansible脚本进阶---playbook

目录 一、playbooks的组成 二、案例 2.1 在webservers主机组中执行一系列任务&#xff0c;包括禁用SELinux、停止防火墙服务、安装httpd软件包、复制配置文件和启动httpd服务。 2.2 在名为dbservers的主机组中创建一个用户组&#xff08;mysql&#xff09;和一个用户&#xf…

二进制部署kubernetes集群的推荐方式

软件版本&#xff1a; 软件版本containerdv1.6.5etcdv3.5.0kubernetesv1.24.0 一、系统环境 1.1 环境准备 角色IP服务k8s-master01192.168.10.10etcd、containerd、kube-apiserver、kube-scheduler、kube-controller-manager、kubele、kube-proxyk8s-node01后续etcd、conta…

论坛介绍 | COSCon'23 开源文化(GL)

众多开源爱好者翘首期盼的开源盛会&#xff1a;第八届中国开源年会&#xff08;COSCon23&#xff09;将于 10月28-29日在四川成都市高新区菁蓉汇举办。本次大会的主题是&#xff1a;“开源&#xff1a;川流不息、山海相映”&#xff01;各位新老朋友们&#xff0c;欢迎到成都&a…

【Qt】文件系统

文章目录 文件系统文件操作案例&#xff1a;显示路径到标题框&#xff0c;显示内容到文本框对文件进行写操作获取文件相关信息 文件系统 Qt 通过QIODevice提供了对 I/O 设备的抽象&#xff0c;这些设备具有读写字节块的能力&#xff0c;下面是 I/O 设备的类图&#xff1a; QIO…

缓解大模型幻觉问题的解决方案

本文记录大模型幻觉问题的相关内容。 参考&#xff1a;Mitigating LLM Hallucinations: a multifaceted approach 地址&#xff1a;https://amatriain.net/blog/hallucinations &#xff08;图&#xff1a;解决大模型幻觉的不同方式&#xff09; 什么是幻觉&#xff1f; 幻觉…

掌握CSS Flexbox,打造完美响应式布局,适配各种设备!

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! ​ 目录 ⭐ 专栏简介 &#x1f4d8; 文章引言 基…

Hover:借贷新势力崛起,在经验与创新中找寻平衡

复苏中的Cosmos 如果让我选择一个最我感到可惜的区块链项目&#xff0c;我会选择Cosmos。 Cosmos最早提出并推动万链互联的概念&#xff0c;希望打通不同链之间的孤岛&#xff0c;彼时和另一个天王项目Polkadot号称跨链双雄。其跨链技术允许不同的区块链网络互相通信&#xf…

M1安装OpenPLC Editor

下载OpenPLC Editor for macOS.zip文件后&#xff0c;使用tar -zvxf命令解压&#xff0c;然后将"OpenPLC Editor"拖入到"应用程序"文件夹 右键点击"OpenPLC Editor"&#xff0c;打开这个""文件&#xff0c;替换为以下内容 #!/bin/bash…

分布式锁其实很简单,6行代码教你实现redis分布式锁

一、前言 分布式锁是一种用于协调分布式系统中多个节点之间对共享资源进行访问控制的机制。它可以确保在分布式环境下&#xff0c;同一时间只有一个节点能够获取到锁&#xff0c;并且其他节点需要等待释放锁后才能获取。 以下是使用分布式锁的几个常见场景和原因&#xff1a;…

「常识」浮点数和定点数

浮点数和定点数 本篇文章旨在简短的介绍浮点数、定点数的定义&#xff0c;以及一些常见的数制、补码。 一、常识 如果缺少以下常识的话&#xff0c;将很难理解浮点数和定点数的概念。 1、数 自然数整数/分数小数&#xff1a;有限小数、无限循环小数、无限不循环小数实数&a…

2.2 消元法的概念

一、消元法介绍 消元法&#xff08;elimination&#xff09;是一个求解线性方程组的系统性方法。下面是使用消元法求解一个 2 2 2\times2 22 线性方程组的例子。消元之前&#xff0c;两个方程都有 x x x 和 y y y&#xff0c;消元后&#xff0c;第一个未知数 x x x 将从第…

APC学习记录

文章目录 APC概念APC插入、执行过程逆向分析插入过程执行过程总结 代码演示参考资料 APC概念 APC全称叫做异步过程调用&#xff0c;英文名是 Asynchronous Procedure Call&#xff0c;在进行系统调用、线程切换、中断、异常时会进行触发执行的一段代码&#xff0c;其中主要分为…

机器学习 | 决策树算法

一、决策树算法概述 1、树模型 决策树&#xff1a;从根节点开始一步步走到叶子节点(决策)。所有的数据最终都会落到叶子节点&#xff0c;既可以做分类也可以做回归。 在分类问题中&#xff0c;表示基于特征对实例进行分类的过程&#xff0c;可以认为是if-then的集合&#xff0…

推理还是背诵?通过反事实任务探索语言模型的能力和局限性

推理还是背诵&#xff1f;通过反事实任务探索语言模型的能力和局限性 摘要1 引言2 反事实任务2.1 反事实理解检测 3 任务3.1 算术3.2 编程3.3 基本的句法推理3.4 带有一阶逻辑的自然语言推理3.5 空间推理3.6 绘图3.7 音乐3.8 国际象棋 4 结果5 分析5.1 反事实条件的“普遍性”5…

基于Qt 文本读写(QFile/QTextStream/QDataStream)实现

​ 在很多时候我们需要读写文本文件进行读写,比如写个 Mp3 音乐播放器需要读 Mp3 歌词里的文本,比如修改了一个 txt 文件后保存,就需要对这个文件进行读写操作。本章介绍简单的文本文件读写,内容精简,让大家了解文本读写的基本操作。 ## QFile 读写文本 QFile 类提供了读…

AIGC应用公司开始赚钱了,创始人来自中国,7个月实现100万美元ARR

图片来源&#xff1a;由无界AI生成 自 2022 年中以来&#xff0c;AIGC 赛道持续 1 年有余。然而&#xff0c;热闹归热闹&#xff0c;赚钱的公司一只手都能数得过来。奇葩如 Midjourney&#xff0c;硬是不靠 VC 输血凭着 11 人年做到 1 亿美元 ARR&#xff1b;幸运如 Jasper&…

ideaSSM在线商务管理系统VS开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 SSM 在线商务管理系统是一套完善的信息管理系统&#xff0c;结合SSM框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码 和数据库&#xff0c;系统主…

No175.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

WIN11新版画图问题解决

1 白色背景被连同删除的问题 解决方法&#xff1a;加层 将层调整为新建的层&#xff0c;在这个层下画图就行。 2 QQ截图无法直接放在画图上的问题 使用QQ截图的时候&#xff1a; 解决方法&#xff1a;使用windows自带的截图工具或者微信截图 步骤&#xff1a; 1. windows自带…

网络安全—小白自学

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高&#xff1b; 二、则是发展相对成熟…