Kafka集成flume

1.flume作为生产者集成Kafka

        kafka作为flume的sink,扮演消费者角色

     1.1 flume配置文件

        vim $kafka/jobs/flume-kafka.conf

# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = TAILDIR
#记录最后监控文件的断点的文件,此文件位置可不改
a1.sources.r1.positionFile =  /export/server/flume/job/data/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /export/server/flume/job/data/.*file.*
a1.sources.r1.filegroups.f2 =/export/server/flume/job/data/.*log.*# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = customers
a1.sinks.k1.kafka.bootstrap.servers =node1:9092,node2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

1.2开启flume监控

flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume.conf

1.3开启Kafka消费者

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers --from-beginning

1.4生产数据

往被监控文件输入数据

[ljr@node1 data]$echo hello >>file2.txt 
[ljr@node1 data]$ echo ============== >>file2.txt 

查看Kafka消费者

可见Kafka集成flume生产者成功。

2.flume作为消费者集成Kafka

        kafka作为flume的source,扮演生产者角色

2.1flume配置文件

vim $kafka/jobs/flume-kafka.conf


# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#注意不要大于channel transactionCapacity的值100
a1.sources.r1.batchSize = 50 
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers =node1:9092, node1:9092
a1.sources.r1.kafka.topics = consumers
a1.sources.r1.kafka.consumer.group.id = custom.g.id# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#注意transactionCapacity的值不要小于sources batchSize的值50
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.2开启flume监控

flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume1.conf

2.3开启Kafka生产者并生产数据

kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers

 

查看flume监控台

可见Kafka集成flume消费者成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/25467.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于python-CNN深度学习的水瓶是否装满水识别-含数据集+pyqt界面

代码下载地址: https://download.csdn.net/download/qq_34904125/89374853 本代码是基于python pytorch环境安装的。 下载本代码后,有个requirement.txt文本,里面介绍了如何安装环境,环境需要自行配置。 或可直接参考下面博文…

绘唐2.5一键追爆款2.5免费版

免费分享给您 小说推文工具是一种用于在社交媒体上宣传和推广小说的工具。它可以帮助作者将小说的内容和相关信息以推文的形式快速发布在各种社交媒体平台上,吸引读者的注意力并增加小说的曝光度。 以下是一些小说推文工具可能具备的功能: 1. 编辑和排…

【linux】进程控制——进程创建,进程退出,进程等待

个人主页:东洛的克莱斯韦克-CSDN博客 祝福语:愿你拥抱自由的风 相关文章 【Linux】进程地址空间-CSDN博客 【linux】详解linux基本指令-CSDN博客 目录 进程控制概述 创建子进程 fork函数 父子进程执行流 原理刨析 常见用法 出错原因 进程退出 概…

MyBatis-Plus学习总结

一.快速入门 (一)简介 MyBatis-Plus (opens new window)(简称 MP)是一个 MyBatis (opens new window) 的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。 (二)快速入门 1.准备数据库脚本 2.准备bo…

六、Docker Swarm、Docker Stack和Portainer的使用

六、Docker swarm和Docker stack的使用 系列文章目录1.Docker swarm1.简介2.docker swarm常用命令3.docker node常用命令4.docker service常用命令5.实战案例6.参考文章 2.Docker stack1.简介3.Docker stack常用命令4.实战案例5.常见问题及调错方式1.查看报错信息并尝试解决&am…

SpringBootWeb 篇-深入了解 Redis 五种类型命令与如何在 Java 中操作 Redis

🔥博客主页: 【小扳_-CSDN博客】 ❤感谢大家点赞👍收藏⭐评论✍ 文章目录 1.0 Redis 概述 1.1 Redis 下载与安装 2.0 Redis 数据类型 3.0 Redis 常见五种类型的命令 3.1 字符串操作命令 3.2 哈希操作命令 3.3 列表操作命令 3.4 集合操作命令 …

7-43 排列问题

排列问题 分数 10 全屏浏览 切换布局 作者 雷丽兰 单位 宜春学院 全排列问题 输出自然数1至n中n个数字的全排列(1≤n≤9),要求所产生的任一数字序列中不允许出现重复的数字。 输入格式: 一个自然数 输出格式: 由1到n中n个数字组成的…

Tessy学习系列(三):单元测试——官方例程isValueInRange

一、工程创建 (1)新建工程 注意:工程名称以及路劲不能包含空格和中文 (2)新建测试集与单元测试模块 新建测试集 新建单元测试模块 设置测试模块为单元测试模块并选择GNU GCC编译器如果需要其他的编译器,…

关于选择,关于处事

一个人选择应该选择的是勇敢,选择不应该选择的是无奈。放弃,不该放弃的是懦夫,不放弃应该放弃的是睿智。所以,碰到事的时候要先静,先不管什么事,先静下来,先淡定,先从容。在生活里要…

【线性代数】向量空间,子空间,向量空间的基和维数

向量空间 设V为n维向量的集合,如果V非空,且集合V对于向量的加法以及数乘两种运算封闭,那么就称集合V为向量空间 x,y是n维列向量。 x 向量组等价说明可以互相线性表示 向量组等价则生成的向量空间是一样的 子空间 例题18是三位向…

Docker Swarm持久化

Docker Swarm持久化 1 简介 Docker Swarm持久化有bind、volume和NFS三种方式,bind和volume两种方式适合挂载单个宿主机,不适合集群;NFS适合集群服务,但需要安装NFS系统。 注意:Docker Swarm需要先安装集群。 由Doc…

python-数字黑洞

[题目描述] 给定一个三位数,要求各位不能相同。例如,352是符合要求的,112是不符合要求的。将这个三位数的三个数字重新排列,得到的最大的数,减去得到的最小的数,形成一个新的三位数。对这个新的三位数可以重…

【数据结构】【版本1.0】【线性时代】——顺序表

快乐的流畅:个人主页 个人专栏:《算法神殿》《数据结构世界》《进击的C》 远方有一堆篝火,在为久候之人燃烧! 文章目录 引言一、顺序表的概念1.1 最基础的数据结构:数组1.2 数组与顺序表的区别 二、静态顺序表三、动态…

error while loading shared libraries 找不到动态库问题如何解决

在使用 c 或 c 开发应用时,在启动程序时,有时会遇到这个错误,找不到动态库。这个时候,我们使用 ldd 来查看,发现可执行文件依赖的动态库显示为 not found。 1 实验代码 使用如下 3 个文件做实验。 hello.h 中声明了函…

【Vue】修改数量

文章目录 底部总价展示完整代码 注意:前端 vuex 数据,后端数据库数据都要 注册点击事件 页面中dispatch action 提供action函数 提供mutation处理函数 底部总价展示 提供getters 动态渲染 完整代码 main.js import Vue from vue import App from…

Linux:基础开发工具

文章目录 Linux 软件包管理器 yum什么是软件包关于rzsz查看软件包安装软件卸载软件安装扩展源 Linux 编辑器 vimvim的基本概念正常/普通/命令模式(Normal mode)插入模式(Insert mode)底行模式(last line mode) vim的基本操作[命令模式]切换至[插入模式][插入模式]切换至[命令模…

【CW32F030CxTx StartKit开发板】开发资料

本来是参加21ic的评测活动,不知道为什么评测文章一直被提示有不良内容,所以只好先在此记录一下相关的资料。 此次测试的是CW32F030CxTxStartKit 评估板。该开发板为用户提供一种经济且灵活的方式使用 CW32F030CxTx 芯片构建系统原型,可进行性…

激活乡村振兴新动能:推动农村产业融合发展,打造具有地方特色的美丽乡村,实现乡村全面振兴

目录 一、推动农村产业融合发展 1、农业产业链条的延伸 2、农业与旅游业的结合 二、挖掘地方特色,打造美丽乡村 1、保护和传承乡村文化 2、发展特色农业 三、加强基础设施建设,提升乡村品质 1、改善农村交通条件 2、提升农村水利设施 四、促进…

吴恩达2022机器学习专项课程C2W2:2.23 选修_反向传播算法的工作原理(什么是导数图计算大型神经网络)

目录 引言一.导数的计算1.epsilon与导数的关系2.其它导数符号形式3.导数小结 二.小型神经网络的计算图1.什么是计算图(前向传播过程)2.反向传播计算过程3.验证反向传播的计算结果4.为什么用反向传播计算导数? 三.扩大神经网络的计算图1.计算反…

笔记本充电出现了问题。

不知道为什么。电池充电图片一直显示的空。谁能救救我!