flux storm
微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑。
Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自动调用自定义配置方法)。
您只需要使用Flux就是将其作为依赖项添加到“ pom.xml”中,通过单个YAML文件进行配置(请检查助焊剂示例 ),然后将其用作主类以在Storm集群中部署拓扑(或作为本地测试)。
为了初始化KafkaBolt ,需要执行以下步骤:
- 通过“ withTopicSelector ”方法定义“ topicSelector ”
- 通过“ withTupleToKafkaMapper ”方法定义一个“ kafkaMapper”
- 通过“ withProducerProperties ”方法定义“ kafkaProducerProps ”
- 使用以上配置初始化“ org.apache.storm.kafka.bolt.KafkaBolt ”
- 作为流的一部分包含在KafkaBolt之上
KafkaBolt的最小Flux配置示例:
components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"- id: "stringMultiScheme"className: "org.apache.storm.spout.SchemeAsMultiScheme"constructorArgs:- ref: "stringScheme"- id: "zkHosts"className: "org.apache.storm.kafka.ZkHosts"constructorArgs:- "localhost:2181"- id: "topicSelector"className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"constructorArgs:- "myTopicName"- id: "kafkaMapper"className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"- id: "kafkaProducerProps"className: "java.util.Properties"configMethods:- name: "put"args:- "bootstrap.servers"- "localhost:9092"- name: "put"args:- "acks"- "1"- name: "put"args:- "key.serializer"- "org.apache.kafka.common.serialization.StringSerializer"- name: "put"args:- "value.serializer"- "org.apache.kafka.common.serialization.StringSerializer" bolts: - id: "bolt-kafka"className: "org.apache.storm.kafka.bolt.KafkaBolt"parallelism: 1configMethods:- name: "withProducerProperties"args: [ref: "kafkaProducerProps"]- name: "withTopicSelector"args: [ref: "topicSelector"]- name: "withTupleToKafkaMapper"args: [ref: "kafkaMapper"]streams:- name: "spout --> kafkaBolt"from: "spout-1"to: "bolt-kafka"grouping:type: LOCAL_OR_SHUFFLE
有关完整的工作配置示例,请选中此项 ,可以像这样使用 。
在Storm上部署拓扑的示例命令:
storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml
KafkaSpout的助焊剂配置已作为官方助焊剂示例进行了描述。 Flux是一个非常有用的框架,它消除了定义和初始化拓扑所需的自定义代码
翻译自: https://www.javacodegeeks.com/2016/05/apache-storm-configure-kafkabolt-flux.html
flux storm