storm是一个强大的流式计算框架,单流的storm在使用中非常普遍,而同时storm也提供对多个流的支持;通过定义多个流,用户可以进一步的把数据发放到不同的流中进行处理。
代码如下:
一、 定义多个流的spout
public class MultiStreamRandomWordSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector;// 模拟一些数据String[] words = { "iphone", "xiaomi", "mate", "sony", "sumsung", "moto","meizu" };@Overridepublic void nextTuple() {Random random = new Random();int index = random.nextInt(words.length);// 通过随机数拿到一个商品名String godName = words[index];// 分别给s1和s2着两个流中发送一个godName的数据collector.emit("s1", new Values(godName));collector.emit("s2", new Values(godName));// 每发送一个消息,休眠500msUtils.sleep(50000);}@Overridepublic void open(@SuppressWarnings("rawtypes") Map conf,TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {/* 分别给两个流中声明一个origianl */declarer.declareStream("s1", new Fields("original"));declarer.declareStream("s2", new Fields("original"));}}
二、相关的处理bolt
public class UpperBolt extends BaseBasicBolt{//业务处理逻辑@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {//先获取到上一个组件传递过来的数据,数据在tuple里面String godName = tuple.getString(0);//将商品名转换成大写String godName_upper = godName.toUpperCase();//将转换完成的商品名发送出去collector.emit(new Values(godName_upper));}//声明该bolt组件要发出去的tuple的字段@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uppername"));}}
public class SuffixBolt extends BaseBasicBolt {FileWriter fileWriter = null;// 在bolt组件运行过程中只会被调用一次@Overridepublic void prepare(Map stormConf, TopologyContext context) {// try {
// fileWriter = new FileWriter("/home/hadoop/stormoutput/"
// + UUID.randomUUID());
// } catch (IOException e) {
// throw new RuntimeException(e);
// }}// 该bolt组件的核心处理逻辑// 每收到一个tuple消息,就会被调用一次@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {// 先拿到上一个组件发送过来的商品名称String upper_name = tuple.getString(0);String suffix_name = upper_name + "_itisok";// 为上一个组件发送过来的商品名称添加后缀System.err.println(suffix_name);// try {// fileWriter.write(suffix_name);// fileWriter.write("\n");// fileWriter.flush();//// } catch (IOException e) {// throw new RuntimeException(e);// }}// 本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段@Overridepublic void declareOutputFields(OutputFieldsDeclarer arg0) {}}
三、topo
public class MultiStreamTopo {public static void main(String[] args) throws AlreadyAliveException,InvalidTopologyException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("multiSpout", new MultiStreamRandomWordSpout(), 1);builder.setBolt("bolt1", new UpperBolt(), 1).shuffleGrouping("multiSpout", "s1");//让bolt1来随机分组的方式消费multiSpout发送的s1流中的数据builder.setBolt("bolt2", new UpperBolt(), 1).shuffleGrouping("multiSpout", "s2");让bolt2来随机分组的方式消费multiSpout发送的s2流中的数据builder.setBolt("bolt3", new SuffixBolt(), 1).shuffleGrouping("bolt1").shuffleGrouping("bolt2");//bolt3同时随机分组的方式消费bolt1和bolt2的默认流中的数据Config config = new Config();config.setDebug(false);config.setNumAckers(0);if (args.length > 0) {StormSubmitter.submitTopology(args[0], config,builder.createTopology());} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", config, builder.createTopology());}}
}