flux storm_Apache Storm:如何使用Flux配置KafkaBolt

flux storm

微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑。

Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自动调用自定义配置方法)。

您只需要使用Flux就是将其作为依赖项添加到“ pom.xml”中,通过单个YAML文件进行配置(请检查助焊剂示例 ),然后将其用作主类以在Storm集群中部署拓扑(或作为本地测试)。

为了初始化KafkaBolt ,需要执行以下步骤:

  1. 通过“ withTopicSelector ”方法定义“ topicSelector
  2. 通过“ withTupleToKafkaMapper ”方法定义一个“ kafkaMapper”
  3. 通过“ withProducerProperties ”方法定义“ kafkaProducerProps
  4. 使用以上配置初始化“ org.apache.storm.kafka.bolt.KafkaBolt
  5. 作为流的一部分包含在KafkaBolt之上

KafkaBolt的最小Flux配置示例:

components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"- id: "stringMultiScheme"className: "org.apache.storm.spout.SchemeAsMultiScheme"constructorArgs:- ref: "stringScheme"- id: "zkHosts"className: "org.apache.storm.kafka.ZkHosts"constructorArgs:- "localhost:2181"- id: "topicSelector"className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"constructorArgs:- "myTopicName"- id: "kafkaMapper"className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"- id: "kafkaProducerProps"className: "java.util.Properties"configMethods:- name: "put"args:- "bootstrap.servers"- "localhost:9092"- name: "put"args:- "acks"- "1"- name: "put"args:- "key.serializer"- "org.apache.kafka.common.serialization.StringSerializer"- name: "put"args:- "value.serializer"- "org.apache.kafka.common.serialization.StringSerializer" bolts:    - id: "bolt-kafka"className: "org.apache.storm.kafka.bolt.KafkaBolt"parallelism: 1configMethods:- name: "withProducerProperties"args: [ref: "kafkaProducerProps"]- name: "withTopicSelector"args: [ref: "topicSelector"]- name: "withTupleToKafkaMapper"args: [ref: "kafkaMapper"]streams:- name: "spout --> kafkaBolt"from: "spout-1"to: "bolt-kafka"grouping:type: LOCAL_OR_SHUFFLE

有关完整的工作配置示例,请选中此项 ,可以像这样使用 。

在Storm上部署拓扑的示例命令:

storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml

KafkaSpout的助焊剂配置已作为官方助焊剂示例进行了描述。 Flux是一个非常有用的框架,它消除了定义和初始化拓扑所需的自定义代码

翻译自: https://www.javacodegeeks.com/2016/05/apache-storm-configure-kafkabolt-flux.html

flux storm

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/336388.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

android 在什么情况下会主动gc_Python 什么情况下会生成 pyc 文件?

作者:折木奉太郎(经授权转载)来源:https://www.zhihu.com/question/30296617/answer/112564303作为 Python 爱好者,需要了解 .py 脚本的基本运行机制及特性:在很多工作上,Python 的运行流程基本上取决于用户&#xff0…

oracle 查看内存参数配置,Oracle内存参数配置及版本问题

Oracle的内存配置与Oracle性能息息相关。从总体上讲,可以分为两大块:共享部分(主要是SGA)和进程独享部分(主要是PGA)。在 32 位操作系统下 的Oracle版本,不时有项目反馈关于内存的错误(如ORA-04030、04031错误)都是十分令人头疼的问题。查阅资…

【IP协议头分析】

Version 版本号 IHL IP头长度 Type of Service Total Length 总长度 Identification 拆包的唯一标识 Flags 1位保留 , 2位 允许拆包 3位 不允许拆包 Fragment offset 在原包偏移量 Time to Live 时间或路由跳数 Protocol IP中是什么协议类型 Header Check…

乡村野生草药_官方野生蝇群流口水分数

乡村野生草药官方是什么? 标题太小,但有用的贡献。 Wildfly Swarm允许我们创建相当小的自包含应用程序,包括我们从Wildfly Application Server中需要的应用程序。 在这篇文章中,我们将研究与Wildfly Swarm合作使用的Drools分数 。…

obs多推流地址_(无人直播)教程利用OBS推流抖音直播电脑屏幕或PC游戏

目前的火爆程度相信大家都有目共睹,也為部分活躍用戶提供直播功能,開通直播的方法有:①粉絲達到10000粉絲,官方會自動邀請妳開通直播權限。②加入和官方合作的工會,無需粉絲,也可以開通直播權限。開通直播功…

oracle数据库sqlloader,sql loader ---ORACLE SQLLDR

sql loader的基本使用:1. sql loader里有几个概念:控制文件:和数据库的文件不是一回事,个人理解是用于数据加载控制的。数据文件:要加载入库的数据文件,支持文本,csv, 等格式。 数据文件的内容可…

倒数日电脑版_应用日报|iOS 或更名为 iPhoneOS,倒数日 Mac 版上线限时免费

今日推荐倒数日 Days Matter for Desktop免费,macOS 86 MB倒数日 Days Matter 是 iOS 端非常经典的应用,今天它的 Mac 版本上线,限时免费 7 天。它延续了 iOS 端的风格语言,同样能帮你记录生活中重要的日子:例如恋人…

【TPC协议头解析】

Source Port 源端口 Destination Port 目的端口 Sequence Number 数据报编号 (seq)(按字节序)保证有序 Acknowledgment Number (ACK) 报文应答, 保证可靠性 Data offset 跳到data数据部分 Reserved 保留部分 URG 紧急数据标识 ACK 确认消息…

hibernate查询缓存_在Hibernate中启用实体和查询缓存

hibernate查询缓存1.简介 在我执行过的与性能相关的任务中,这就是其中之一。 令人担心的是,如果对于特定实体每次都调用相同的查询,并且表数据在特定的时隙内不易更改,则我们可以使用Hibernate缓存查询结果。 这意味着&#xff0c…

iphone新旧手机数据传输已取消_如何取消iPhone手机App自动扣费?三种方法让你不再被“偷”...

一直以来苹果手机App这个自动续费真是让大家不胜其烦,钱虽然不多可总是平白无故这里十块,那里二十块的被扣也是不小的浪费,日常生活中为了追追剧、听听音乐、玩玩游戏难免会开通一些App会员而选择自动续费的话每月会费是有不错优惠的&#xf…

【TCP三次握手与四次挥手最强解析】

TCP连接拥塞控制四种方法总结(详细简单,稳的一批) TCP三次握手 作用:确认seqnumber;确定窗口大小以及最大报文大小 TCP四次挥手 WAIT_TIME状态到CLOSED需要2MSL时间(最长单位MSL为2min,一般30s&#xff0…

oracle基础授权,Oracle基础学习3--Oracle创建用户并授权

Oracle服务器端的操作一般如下:1)安装Oracle服务器软件2) 创建数据库(安装时自动创建)3) 配置监听(安装时自动配置)4) 启动Oracle实例5) 创建用户表空间6) 创建新用户并授权下面就开始讲创建用户表空间、创建新用户并授权两项&#x…

mybatis缓存二级缓存_MyBatis缓存与Apache Ignite的陷阱

mybatis缓存二级缓存一周前,MyBatis和Apache ignite 宣布支持apache ignite作为MyBatis缓存(L2缓存)。 从技术上讲,MyBatis支持两个级别的缓存: 本地缓存,默认情况下始终启用 L2缓存,可选 随…

【H.264/AVC视频编解码技术】第一章【H264视频编码详细解析】

H264压缩比 YUV格式为 YUV420 分辨率 640 x 480 帧率 15 ,比特率为 640 x 480 x 1.5 x 15 x 8 =55M ,建议码流 500kpbs 。 压缩率 1 / 100 声网权威推荐码流 GOP 所谓GOP,意思是画面组,MPEG格中的帧序列,分为I、P、B三种,如排成IBBPBBPBBPBBPBBP...样式,这种…

企业是否应该实现对客户需求的快速响应_CRM系统给企业带来的创新有哪些?如何让销售爱用CRM系统?...

对于现代企业来说,CRM已然成为了一个成熟的项目,是能够给客户带来不菲的价值的。如果应用好,是能够取得提高客户满意度与企业经济效益的双赢成绩的。一、目前,CRM能够给企业带来的业务创新有哪些?1.统一数据CRM系统能将…

脚本实现oracle服务启停,通用服务启停shell脚本

####################################通用启停脚本#david###################################APP_NAMEmyApppsid0checkpid() {redisPidps -ef | grep $APP_NAME | grep -v grep | awk {print $2}if [[ -n "$myAppPid" ]]; thenpsid$myAppPidelsepsid0fi}status() {…

junit注释_通过此注释改善您的JUnit体验

junit注释JUnit可能是所有Java项目中90%的一部分。 令人兴奋的是,我们很快将拥有支持Java 8的JUnit 5 。 我们最近在博客上发表了一项改进 。 回到JUnit 4领域,有一个小技巧,我只能建议您进行所有单元测试。 只需在此处添加这个小…

lagom的微服务框架_微服务有麻烦吗? Lagom在这里为您提供帮助。 试试吧!

lagom的微服务框架蛋糕支持。 我们很自豪地宣布,新的Apache许可的微服务框架Lagom可在GitHub上使用 ! 当其他框架专注于打包和实例启动时,Lagom重新定义了Java开发人员构建基于微服务的应用程序的方式。 服务是异步的。 服务内通信由您管理。…

oracle创建简单包,Oracle创建程序包是什么?

一、程序包的相关知识1。定义与说明a。 相关对象的封装b。 程序包的各部分- 程序包规格说明声明子程序- 程序包主体定义子程序2。使用程序包的优点- 模块化- 更轻松的应用程序设计- 信息隐藏- 新增功能- 性能更佳3。公有项和私有项的区别公有项:在程序包说明部分定义…