Apache Storm:如何使用Flux配置KafkaBolt

微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑。

Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自动调用自定义配置方法)。

您只需要使用Flux就是将其作为依赖项添加到“ pom.xml”中,通过单个YAML文件进行配置(请检查助焊剂示例 ),然后将其用作主类以在Storm集群中部署拓扑(或作为本地测试)。

为了初始化KafkaBolt ,需要执行以下步骤:

  1. 通过“ withTopicSelector ”方法定义“ topicSelector
  2. 通过“ withTupleToKafkaMapper ”方法定义一个“ kafkaMapper”
  3. 通过“ withProducerProperties ”方法定义一个“ kafkaProducerProps”
  4. 使用以上配置初始化“ org.apache.storm.kafka.bolt.KafkaBolt
  5. 在流中包含以上KafkaBolt

KafkaBolt的最小Flux配置示例:

components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"- id: "stringMultiScheme"className: "org.apache.storm.spout.SchemeAsMultiScheme"constructorArgs:- ref: "stringScheme"- id: "zkHosts"className: "org.apache.storm.kafka.ZkHosts"constructorArgs:- "localhost:2181"- id: "topicSelector"className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"constructorArgs:- "myTopicName"- id: "kafkaMapper"className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"- id: "kafkaProducerProps"className: "java.util.Properties"configMethods:- name: "put"args:- "bootstrap.servers"- "localhost:9092"- name: "put"args:- "acks"- "1"- name: "put"args:- "key.serializer"- "org.apache.kafka.common.serialization.StringSerializer"- name: "put"args:- "value.serializer"- "org.apache.kafka.common.serialization.StringSerializer" bolts:    - id: "bolt-kafka"className: "org.apache.storm.kafka.bolt.KafkaBolt"parallelism: 1configMethods:- name: "withProducerProperties"args: [ref: "kafkaProducerProps"]- name: "withTopicSelector"args: [ref: "topicSelector"]- name: "withTupleToKafkaMapper"args: [ref: "kafkaMapper"]streams:- name: "spout --> kafkaBolt"from: "spout-1"to: "bolt-kafka"grouping:type: LOCAL_OR_SHUFFLE

有关完整的工作配置示例,请选中此项 ,可以像这样使用 。

在Storm上部署拓扑的示例命令:

storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml

KafkaSpout的助焊剂配置已作为官方助焊剂示例进行了描述。 Flux是一个非常有用的框架,它消除了定义和初始化拓扑所需的自定义代码

翻译自: https://www.javacodegeeks.com/2016/05/apache-storm-configure-kafkabolt-flux.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/353872.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 获取当前函数名

import java.text.SimpleDateFormat; import java.util.Date; /** * Java实现类似C/C中的__FILE__、__FUNC__、__LINE__等,主要用于日志等功能中。 * * version 1.0 2011-07-13 * */ public abstract class CommonFunction { /** * 打印日志时获取当前的程序文件名、行号、方法…

美国华尔街拥抱区块链是最大的威胁

Overstock的首席执行官帕特里克伯恩宣布证券交易委员会已批准其计划在区块链上发行股票,该区块链是推动比特币数字货币的巨大在线分类账。 这是一个重要的时刻。至少在理论上,区块链可以更有效,准确和公开地跟踪股票,债券和其他金…

java枚举和枚举类_Java枚举:您拥有优雅,优雅和力量,这就是我所爱!

java枚举和枚举类当Java 8即将面世时,您确定您对Java 5中引入的枚举很熟悉吗? Java枚举仍然被低估了,很可惜,因为它们比您想象的要有用,它们不仅仅用于通常的枚举常量! Java枚举是多态的 Java枚举是可以包…

Java关键字final、static使用总结

Java关键字final、static使用总结 一、final 根据程序上下文环境,Java关键字final有“这是无法改变的”或者“终态的”含义,它可以修饰非抽象类、非抽象类成员方法和变量。你可能出于两种理解而需要阻止改变:设计或效率。 final类不能被继承…

【SpringCloud】第五篇: 路由网关(zuul)

前言: 必需学会SpringBoot基础知识 简介: spring cloud 为开发人员提供了快速构建分布式系统的一些工具,包括配置管理、服务发现、断路器、路由、微代理、事件总线、全局锁、决策竞选、分布式会话等等。它运行环境简单,可以在开发人员的电脑上跑。 工具:…

ubuntu shell简介

ubuntu shell简介 (2012-03-13 19:48:09) 标签: 杂谈 分类: 虚拟机下的ubuntu8.04 1.什么是shell? shell是接受用户或应用层的命令,并将这些命令解释给底层的系统内核层,由这些内核完成相应的工作,并将结果…

如何更新Jenkins作业发布config.xml

最近,我想更新Cloudbees中的一些作业(未使用DSL定义),为每个作业添加一些属性。 好吧,我在使其工作时遇到了一些麻烦,这是我的注意事项(我使用的是Jenkins 1.651.2.1,但有可能它应与…

月下“毛景树”

Description 毛毛虫经过及时的变形,最终逃过的一劫,离开了菜妈的菜园。 毛毛虫经过千山万水,历尽千辛万苦,最后来到了小小的绍兴一中的校园里。爬啊爬~爬啊爬~~毛毛虫爬到了一颗小小的“毛景树”下面,发现树上长着他最…

自由口通信模式下计算机读写PLC存储区的程序

自由口通信模式下计算机读写PLC存储区的程序 2008-8-20 8:51:00 来源:摘 要:本文介绍了在自由口通信模式下,用计算机读写S7-200 PLC存储区内相邻的多个字节数据的通信程序设计方法,程序设计中采用了多种可靠性措施。 关键词&…

Zookeeper入门

ZooKeeper 是一个开源的分布式协调架,主要用来解决分布式集群中应用系统的一致性问题 本质 分布式的文件存储系统(Zookeeper文件系统监听机制),是一个基于观察者模式设计的分布式服务管理框架 zookeeper的数据结构 Zookeeper的层次模型称作Data Tree,…

【laravel5.4】重定向带参数

1、 2、重定向回上一页面 3、返回上一页面带参数 转载于:https://www.cnblogs.com/xuzhengzong/p/8715463.html

Google Protocol Buffers 2.3.0 for java 快速开始

Google Protocol Buffers 2.3.0 for java 快速开始 博客分类: Java JavaGoogleUbuntuLinux数据结构Protocol Buffers是一个平台中立,编程语言无关的,可扩展的机制,是用于结构化数据串行化的灵活、高效、自动的方法&a…

jenkins api_接触Jenkins(Hudson)API,第2部分

jenkins api这篇文章从本教程的第1部分继续。 已经快一年了,但是我终于有时间重新审视我为与Jenkins api交互而编写的一些代码。 我已经使用了部分工作来帮助管理许多Jenkins构建服务器,主要是保持插件同步以及将作业从一台机器移动到另一台机器。 在本文…

JAVAC 命令详解

JAVAC 命令详解 结构 javac [ options ] [ sourcefiles ] [ files ]参数可按任意次序排列。 options 命令行选项。 sourcefiles 一个或多个要编译的源文件(例如 MyClass.java)。 files 一个或多个对源文件进行列表的文件。 说明 javac 有两种方法可将源…

[MEGA DEAL]终极Java捆绑包(95%折扣)

通过114个小时的培训来掌握这种流行的编码语言,从而开始您的编程奥德赛 嘿,怪胎, 本周,在我们的JCG Deals商店中 ,我们提供了一个极端的报价 。 我们提供的Ultimate Java Bundle 仅售69美元,而不是原始价…

Mybatis 在 insert 之后想获取自增的主键 id,但却总是返回1

记录一次傻逼的问题, 自己把自己蠢哭:Mybatis 在 insert 之后想获取自增的主键 id,但却总是返回1 错误说明: 返回的1是影响的行数,并不是自增的主键id; 想要获取自增主键id,需要通过xx.getId()方…

android与PC,C#与Java 利用protobuf 进行无障碍通讯【Socket】

android与PC,C#与Java 利用protobuf 进行无障碍通讯【Socket】 2011-04-27 17:00:11 标签:休闲 Java Socket 移动开发 android 原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律…

Confluence 6 自动添加用户到用户组

默认组成员(Default Group Memberships)选项在 Confluence 3.5 及后续版本和 JIRA 4.3.3 及后续版本中可用。这字段将会在你选择 Read Only, with Local Groups 权限后出现。如果你希望你的用户能自动添加到用户组或多个用户组,在这里输入你希…

Protocol Buffers的应用与分析

Protocol Buffers的应用与分析 明尘 1 Protocol Buffers的介绍 Protocol Buffers是一种用于序列化结构化数据的机制,它具有灵活、高效、自动化的特点。类似于XML,但是比XML更小巧、快捷、简单。在Google 几乎所有它内部的RPC协议和文件格式都是采用PB。…

jenkins api_接触Jenkins(Hudson)API,第1部分

jenkins api哪一个-哈德森还是詹金斯? 都。 几个月前,我开始使用Hudson v1.395来从事这个小项目,在出现巨大分歧之后又回到了这个项目。 我以此为契机,看我将来选择永久搬到詹金斯时是否会遇到任何重大问题。 出现了一些麻烦-最值…