聊聊storm的PartialKeyGrouping

本文主要研究一下storm的PartialKeyGrouping

实例

    @Testpublic void testPartialKeyGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {String spoutId = "wordGenerator";String counterId = "counter";String aggId = "aggregator";String intermediateRankerId = "intermediateRanker";String totalRankerId = "finalRanker";int TOP_N = 5;TopologyBuilder builder = new TopologyBuilder();builder.setSpout(spoutId, new TestWordSpout(), 5);//NOTE 通过partialKeyGrouping替代fieldsGrouping,实现较为均衡的负载到countBoltbuilder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);submitRemote(builder);}
复制代码
  • 值得注意的是在wordCount的bolt使用PartialKeyGrouping,同一个单词不再固定发给相同的task,因此这里还需要RollingCountAggBolt按fieldsGrouping进行合并。

PartialKeyGrouping(1.2.2版)

storm-core-1.2.2-sources.jar!/org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {private static final long serialVersionUID = -447379837314000353L;private List<Integer> targetTasks;private long[] targetTaskStats;private HashFunction h1 = Hashing.murmur3_128(13);private HashFunction h2 = Hashing.murmur3_128(17);private Fields fields = null;private Fields outFields = null;public PartialKeyGrouping() {//Empty}public PartialKeyGrouping(Fields fields) {this.fields = fields;}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = targetTasks;targetTaskStats = new long[this.targetTasks.size()];if (this.fields != null) {this.outFields = context.getComponentOutputFields(stream);}}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {List<Integer> boltIds = new ArrayList<>(1);if (values.size() > 0) {byte[] raw;if (fields != null) {List<Object> selectedFields = outFields.select(fields, values);ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);for (Object o: selectedFields) {if (o instanceof List) {out.putInt(Arrays.deepHashCode(((List)o).toArray()));} else if (o instanceof Object[]) {out.putInt(Arrays.deepHashCode((Object[])o));} else if (o instanceof byte[]) {out.putInt(Arrays.hashCode((byte[]) o));} else if (o instanceof short[]) {out.putInt(Arrays.hashCode((short[]) o));} else if (o instanceof int[]) {out.putInt(Arrays.hashCode((int[]) o));} else if (o instanceof long[]) {out.putInt(Arrays.hashCode((long[]) o));} else if (o instanceof char[]) {out.putInt(Arrays.hashCode((char[]) o));} else if (o instanceof float[]) {out.putInt(Arrays.hashCode((float[]) o));} else if (o instanceof double[]) {out.putInt(Arrays.hashCode((double[]) o));} else if (o instanceof boolean[]) {out.putInt(Arrays.hashCode((boolean[]) o));} else if (o != null) {out.putInt(o.hashCode());} else {out.putInt(0);}}raw = out.array();} else {raw = values.get(0).toString().getBytes(); // assume key is the first field}int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size());int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size());int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice;boltIds.add(targetTasks.get(selected));targetTaskStats[selected]++;}return boltIds;}
}
复制代码
  • 可以看到PartialKeyGrouping是一种CustomStreamGrouping,在prepare的时候,初始化了long[] targetTaskStats用于统计每个task
  • partialKeyGrouping如果没有指定fields,则默认按outputFields的第一个field来计算
  • 这里使用guava类库提供的Hashing.murmur3_128函数,构造了两个HashFunction,然后计算哈希值的绝对值与targetTasks.size()取余数得到两个可选的taskId下标
  • 然后根据targetTaskStats的统计值,取用过的次数小的那个taskId,选中之后更新targetTaskStats

PartialKeyGrouping(2.0.0版)

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

/*** A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send* Tuples from a given partition to multiple downstream tasks.** Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each* key is assigned a subset of tasks. Each tuple is then sent to one task from that subset.** Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the default* AssignmentCreator hashes the key and produces an assignment of two tasks*/
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {private static final long serialVersionUID = -1672360572274911808L;private List<Integer> targetTasks;private Fields fields = null;private Fields outFields = null;private AssignmentCreator assignmentCreator;private TargetSelector targetSelector;public PartialKeyGrouping() {this(null);}public PartialKeyGrouping(Fields fields) {this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());}public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {this(fields, assignmentCreator, new BalancedTargetSelector());}public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {this.fields = fields;this.assignmentCreator = assignmentCreator;this.targetSelector = targetSelector;}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = targetTasks;if (this.fields != null) {this.outFields = context.getComponentOutputFields(stream);}}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {List<Integer> boltIds = new ArrayList<>(1);if (values.size() > 0) {final byte[] rawKeyBytes = getKeyBytes(values);final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);boltIds.add(selectedTask);}return boltIds;}/*** Extract the key from the input Tuple.*/private byte[] getKeyBytes(List<Object> values) {byte[] raw;if (fields != null) {List<Object> selectedFields = outFields.select(fields, values);ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);for (Object o : selectedFields) {if (o instanceof List) {out.putInt(Arrays.deepHashCode(((List) o).toArray()));} else if (o instanceof Object[]) {out.putInt(Arrays.deepHashCode((Object[]) o));} else if (o instanceof byte[]) {out.putInt(Arrays.hashCode((byte[]) o));} else if (o instanceof short[]) {out.putInt(Arrays.hashCode((short[]) o));} else if (o instanceof int[]) {out.putInt(Arrays.hashCode((int[]) o));} else if (o instanceof long[]) {out.putInt(Arrays.hashCode((long[]) o));} else if (o instanceof char[]) {out.putInt(Arrays.hashCode((char[]) o));} else if (o instanceof float[]) {out.putInt(Arrays.hashCode((float[]) o));} else if (o instanceof double[]) {out.putInt(Arrays.hashCode((double[]) o));} else if (o instanceof boolean[]) {out.putInt(Arrays.hashCode((boolean[]) o));} else if (o != null) {out.putInt(o.hashCode());} else {out.putInt(0);}}raw = out.array();} else {raw = values.get(0).toString().getBytes(); // assume key is the first field}return raw;}//......
}
复制代码
  • 2.0.0版本将逻辑封装到了RandomTwoTaskAssignmentCreator以及BalancedTargetSelector中

RandomTwoTaskAssignmentCreator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

    /*** This interface is responsible for choosing a subset of the target tasks to use for a given key.** NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus* each of them needs to come up with the same assignment for a given key.*/public interface AssignmentCreator extends Serializable {int[] createAssignment(List<Integer> targetTasks, byte[] key);}/*========== Implementations ==========*//*** This implementation of AssignmentCreator chooses two arbitrary tasks.*/public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {/*** Creates a two task assignment by selecting random tasks.*/public int[] createAssignment(List<Integer> tasks, byte[] key) {// It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the keyfinal long seedForRandom = Arrays.hashCode(key);final Random random = new Random(seedForRandom);final int choice1 = random.nextInt(tasks.size());int choice2 = random.nextInt(tasks.size());// ensure that choice1 and choice2 are not the same taskchoice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;return new int[]{ tasks.get(choice1), tasks.get(choice2) };}}
复制代码
  • 2.0.0版本不再使用guava类库提供的Hashing.murmur3_128哈希函数,转而使用key的哈希值作为seed,采用Random函数来计算两个taskId的下标,这里返回两个值供bolt做负载均衡选择

BalancedTargetSelector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

    /*** This interface chooses one element from a task assignment to send a specific Tuple to.*/public interface TargetSelector extends Serializable {Integer chooseTask(int[] assignedTasks);}/*** A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples* overall from this instance of the grouping.*/public static class BalancedTargetSelector implements TargetSelector {private Map<Integer, Long> targetTaskStats = Maps.newHashMap();/*** Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.*/public Integer chooseTask(int[] assignedTasks) {Integer taskIdWithMinLoad = null;Long minTaskLoad = Long.MAX_VALUE;for (Integer currentTaskId : assignedTasks) {final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);if (currentTaskLoad < minTaskLoad) {minTaskLoad = currentTaskLoad;taskIdWithMinLoad = currentTaskId;}}targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);return taskIdWithMinLoad;}}
复制代码
  • BalancedTargetSelector根据选中的taskId,然后根据targetTaskStats计算taskIdWithMinLoad返回

FieldsGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class FieldsGrouper implements CustomStreamGrouping {private Fields outFields;private List<List<Integer>> targetTasks;private Fields groupFields;private int numTasks;public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {this.outFields = outFields;this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = new ArrayList<List<Integer>>();for (Integer targetTask : targetTasks) {this.targetTasks.add(Collections.singletonList(targetTask));}this.numTasks = targetTasks.size();}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);return targetTasks.get(targetTaskIndex);}}
复制代码
  • 这里可以看到FieldsGrouper的chooseTasks方法使用TupleUtils.chooseTaskIndex来选择taskId下标

TupleUtils.chooseTaskIndex

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java

    public static <T> int chooseTaskIndex(List<T> keys, int numTasks) {return Math.floorMod(listHashCode(keys), numTasks);}private static <T> int listHashCode(List<T> alist) {if (alist == null) {return 1;} else {return Arrays.deepHashCode(alist.toArray());}}
复制代码
  • 这里先对keys进行listHashCode,然后与numTasks进行Math.floorMod运算,即向下取模
  • listHashCode调用了Arrays.deepHashCode(alist.toArray())进行哈希值计算

小结

  • storm的PartialKeyGrouping是解决fieldsGrouping造成的bolt节点skewed load的问题
  • fieldsGrouping采取的是对所选字段进行哈希然后与taskId数量向下取模来选择taskId的下标
  • PartialKeyGrouping在1.2.2版本的实现是使用guava提供的Hashing.murmur3_128哈希函数计算哈希值,然后取绝对值与taskId数量取余数得到两个可选的taskId下标;在2.0.0版本则使用key的哈希值作为seed,采用Random函数来计算两个taskId的下标。注意这里返回两个值供bolt做负载均衡选择,这是与fieldsGrouping的差别。在得到两个候选taskId之后,PartialKeyGrouping额外维护了taskId的使用数,每次选择使用少的,与此同时也更新每次选择的计数。
  • 值得注意的是在wordCount的bolt使用PartialKeyGrouping,同一个单词不再固定发给相同的task,因此这里还需要RollingCountAggBolt按fieldsGrouping进行合并。

doc

  • Common Topology Patterns
  • The Power of Both Choices: Practical Load Balancing for Distributed Stream Processing Engines
  • Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/277075.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ArcGIS Server安装的几个问题

今天安装 了ArcGIS Server &#xff0c;本来一直不愿意装这么“重”的东西&#xff0c;不过新事物还是要看看。安装没有出很大的问题&#xff0c;基本上一次成功&#xff0c;比很久前&#xff08;大概要到03年了吧&#xff09;第一次装ArcIMS要好多了&#xff0c;那次可是重装…

大数据之MySql笔记-0916

2019独角兽企业重金招聘Python工程师标准>>> 复习: 1.MySQL部署 拓展题: rm -rf $MYSQL_HOME/arch/* binlog日志 恢复 主从同步 rm -rf $MYSQL_HOME/data/* 数据 $MYSQL_HOME/scripts/mysql_install_db \ --usermysqladmin \ --basedir/usr/local/mysql \ --dat…

delphi调用c#写的webservice中文出现乱码的问题

解决方法&#xff1a;HTTPRIO1的属性---HttpWebNode--UseUtf8InHeader设置为true 代码片断&#xff1a; function TLoginManager.Get_LoginManagerSoap: ILoginManagerSoap; const defSvcLoginManager; defPrtLoginManagerSoap; var RIO: THTTPRIO; begin Result: nil…

浮浮沉沉的上海

来到上海已经快有一年&#xff0c;抱着学习的心态来的&#xff0c;却发现忘掉了更多&#xff0c;最近一直在回顾&#xff0c;总想把冷却的技能重新拾回来&#xff0c;却也发现不简单。刚刚从餐北斗辞职&#xff0c;也正如朋友所说&#xff0c;趁早走&#xff0c;再迟一点&#…

GridView的操作大全

一、GridView和DataGrid的异同 GridView 是 DataGrid的后继控件&#xff0c;在.net framework 2 中&#xff0c;虽然还存在DataGrid&#xff0c;但是GridView已经走上了历史的前台&#xff0c;取代DataGrid的趋势已是势不可挡。GridView和DataGrid功能相似&#xff0c;都是在we…

Sql Server设置用户只能查看并访问特定数据库

1.新建登录用户 以管理员身份登陆数据库&#xff08;权限最高的身份如sa&#xff09;&#xff0c;点击安全性->登录名&#xff0c;右键新建登录名&#xff0c;输入登录名和密码&#xff0c;取消强制实施密码策略。 2.将服务器角色设置为public 注意&#xff1a;很重要的一…

arcgis jsapi接口入门系列(6):样式

2019独角兽企业重金招聘Python工程师标准>>> symbol: function () {//线样式//样式详情请看官方文档let style {//线颜色&#xff0c;支持多种格式&#xff1a;//CSS color string&#xff1a;例如"dodgerblue";//HEX&#xff1a;例如"#33cc33"…

ORM(四)应用.脚本管理工具

ORM(四)应用.脚本管理工具数据脚本的维护,不知道各位有什么好的工具推荐没有,由于以前一直是用手工来进行脚本的维护操作,很麻烦,而且容易出错.大多数时候,都在原来的基础上进行直接修改.今天有点时间就完成了一个简陋的工具,也是对ORM组件的应用.下面是程序的运行界面http://f…

OpenCV2:应用篇 三维重建

一.简介 VTK(Visualization Toolkit):开源三维图形库 ITK(Insight Segmentation and Registration Toolkit):开源医学图像处理库,包含医学算法和支持医学图片格式DICOM QT:用户图形界面 转载于:https://www.cnblogs.com/k5bg/p/11232131.html

手把手教你搭建Mac环境微信小程序的本地测试服务器

问题的提出 Mac环境方便快捷地搭建小程序的测试服务器 小程序对于网络请求的URL的特殊要求 不能出现端口号不能用localhost必须用https主要步骤 用json-server搭建简单的服务器&#xff0c;搭建出来的服务器地址为localhonst:3000安装nginx进行反向代理&#xff0c;以便隐藏端口…

自定义vue-cli生成项目模板配置(1)

最近在读《变量》&#xff0c;目前得到的认知之一&#xff1a;慢变量才是决定事物长期发展的因素。 打算自定义vue-cli的脚手架或者根据自己的需要设置项目模板的相关参数&#xff0c;很大程度与慢变量这个概念相关。 当然&#xff0c;我还有一个想法或者认知&#xff1a;我的技…

spring cloud 微服务调用--ribbon和feign调用

这里介绍ribbon和feign调用两种通信服务调用方式&#xff0c;同时介绍如何引入第三方服务调用。案例包括了ribbon负载均衡和hystrix熔断--服务降级的处理&#xff0c;以及feign声明式服务调用。例子包括spring boot项目&#xff0c;用来作为第三方服务供ribbon消费和feign消费客…

35岁前把下面十件事做好

35岁是青春的后期&#xff0c;35岁以后是收获的季节&#xff0c;如果你没有资格说这句话&#xff0c;你将会憎恨自己。所以在35岁以前&#xff0c;在烂漫蓬勃的青春年华里&#xff0c;你最好把下面十件事做好&#xff1a; 第一&#xff0c;学会本行业所需要的一切知识并有所发展…

Java笔记:包装类、toString()方法、单例类、比较(==和equals方法)

1.包装类 1&#xff09;包装类为基本数据类型提供了相应的引用数据类型。(基本数据类型-包装类)btye-Byte&#xff0c;char-Character,short-Short,float-Floatint-Integer,double-Double,long-Long,boolean-Boolean 2&#xff09;通过包装类的构造器来实现吧基本数据类型包装成…

log4j.xml引用Javaweb项目中配置文件的参数

2019独角兽企业重金招聘Python工程师标准>>> 由于最近用阿里云日志服务整合log4j&#xff0c;在配置com.aliyun.openservices.log.log4j.LoghubAppender需要设置一些参数&#xff0c;因为项目中有统一的配置文件&#xff0c;所以想要可以直接在log4j.xml中通过${}来…

……

快疯了快疯了…… 我无非是想找人说话。已经几天没有讲话了。 转载于:https://www.cnblogs.com/belial/archive/2007/04/13/711429.html

时间换算单位

时钟周期是一个时间的量&#xff0c;人们规定10纳秒&#xff08;ns&#xff09;为一个时钟周期。时钟周期表示了SDRAM所能运行的最高频率。更小的时钟周期就意味着更高的工作频率。对于PC100规格的内存来说&#xff0c;它的运行时钟周期应该不高于10纳秒。纳秒与工作频率之间的…

砂 即懒且忙 只有随笔

B同学说&#xff1a;砂&#xff0c;你已经好久没更新你的博了。是啊&#xff0c;我即懒且忙。上周六爬了青云山&#xff0c;公司组织的。一直懒得处理照片。拍了将近300张的照片&#xff0c;可到现在也就才弄了那么几张。我自恋。照片是自拍的。娘说&#xff1a;天呐&#xff0…

.Net Core创建Docker镜像

1、.Net Core项目【Lails.Server.Demo】发布到目录下Lails.Server.Demo\bin\Release\netcoreapp2.1\publish2、上面目录下新建文件Dockerfile&#xff1a;# 父镜像FROM microsoft/dotnet:2.1-aspnetcore-runtime AS base# 设置工作目录WORKDIR /app# 复制发布文件到/app下COPY …

[原]变参函数原理详解

/*变参函数原理说明:va_start就是求得第一个可变参的地址.下面几个宏的作用:保是来确定可变参数的列表中每个参数的地址.实现变参函数的要点,就是想办法取得每个参数的地址.*/#include <stdio.h> #include <stdarg.h>#if 0#define va_list void*#define va_arg(arg…