【大数据之Kafka】十六、Kafka集成外部系统之集成Flume

  Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。
在这里插入图片描述
Flume安装和部署:https://blog.csdn.net/qq_18625571/article/details/131678589?spm=1001.2014.3001.5501

1 Flume生产者

在这里插入图片描述
(1)在hadoop102启动Kafka集群。

zk.sh start
kf.sh start

(2)在hadoop103启动Kafka消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

(3)在hadoop102上安装Flume:https://blog.csdn.net/qq_18625571/article/details/131678589?spm=1001.2014.3001.5501 第一章Flume安装部署。

(4)在/opt/module/flume-1.9.0/job目录下创建配置文件file_to_kafka.conf。

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(5)启动Flume

bin/flume-ng agent -c conf/ -n a1 -f job/file_to_kafka.conf

(6)向/opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况。

mkdir applog
echo hello >> /opt/module/applog/app.log

(7)观察 kafka 消费者,能够看到消费的 hello 数据。

2 Flume消费者

在这里插入图片描述

(1)在 hadoop102 节点的 Flume 的/opt/module/flume/job 目录下创建 kafka_to_file.conf。

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = logger# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)启动 Flume。

bin/flume-ng agent -c conf/ -n a1 -f job/kafka_to_file.conf -Dflume.root.logger=INFO,console

(3)启动 kafka 生产者,并输入数据,例如:hello world

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

(4)观察控制台输出的日志

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/84638.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法(三)

哈希表算法章节 (1) Ascall码文章推荐 给定两个字符串 s 和 t ,编写一个函数来判断 t 是否是 s 的字母异位词。注意:若 s 和 t 中每个字符出现的次数都相同,则称 s 和 t 互为字母异位词。 class Solution {public boolean isAnagram(String…

帆软BI开发-Day2-趋势图的多种变形

前言: 在BI数据展示中,条形图、趋势图无疑是使用场景非常多的两种图形。与条形图不同的是,趋势图更能反馈出一定的客观规律和未来的趋势走向,因此用于作为预警和判异的业务场景,但实际业务场景的趋势图可没你想的那么简…

Python阴阳历日期转换

阳历转阴(殷)历,阴历转阳历,了解一下阴阳历的转换逻辑、闰月的转换。 农历,古时称为夏历,是中国现行的传统历法, 属于阴历和阳历的合历,根据月相的变化周期一个月,参…

Eclipse工具使用技巧

1、常用快捷键 ctrlshifto 快速导包 CtrlSpace 内容助理 说明:内容助理。提供对方法,变量,参数,javadoc等得提示,应运在多种场合,总之需要提示的时候可先按此快捷键。注:避免输入法的切换设置与此设置冲突 CtrlShiftSpace 变量提示 Ctrl/ 添加/消除//注释 CtrlShift/ 添加…

华为HCIA(三)

链路本地地址接口标识64bit 当STP端口到了Forwarding状态后,会转发流量,也处理报文 在TCP/IP模型中,会话层,表示层和应用层,都规划成了应用层 路由表包含目的地址和掩码,优先级,cost,下一跳和…

JWT令牌

一、JWT(Json Web Token)能干什么 1、安全认证(权限认证) 比如登录系统的时候,服务器会检查前端请求数据中携带的token信息,符合标准则允许访问,不符合则拒绝你的访问请求。 2、信息传递 比…

laravel框架 - 事件与监听器

一,绑定事件与监听器 在app\Providers下的EventServiceProvider.php中添加我们定义的事件与监听器 protected $listen [Registered::class > [SendEmailVerificationNotification::class,],App\ebvent\RegisterMessage>[//事件App\listeners\SendMessage//监…

高云FPGA系列教程(10):letter-shell移植

文章目录 letter-shell简介letter-shell源码获取letter-shell移植函数和变量应用示例 本文是高云FPGA系列教程的第10篇文章。 shell,中文是外壳的意思,就是操作系统的外壳。通过shell命令可以操作和控制操作系统,比如Linux中的Shell命令就包括…

QT--day5

注册 mainwindow.h #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include<QPushButton> #include<QLineEdit> #include<QLabel> #include <QMessageBox> #include<QString> #include<QSqlDatabase> …

CAN总线物理层

本文的目的并不是为了介绍或普及CAN总线相关知识,而是为了了解CAN总线,进而为CAN通信一致性测试做知识储备。 CAN,控制器局域网,全称:Controller Area Network。1986年,由德国Bosch公司为汽车开发的网络技术,主要用于汽车的监测与控制,目的为适应汽车“减少线束的数量…

1787_函数指针的使用

全部学习汇总&#xff1a;GitHub - GreyZhang/c_basic: little bits of c. 前阵子似乎写了不少错代码&#xff0c;因为对函数指针的理解还不够。今天晚上似乎总算是梳理出了一点眉目&#xff0c;在先前自己写过的代码工程中做一下测试。 先前实现过一个归并排序算法&#xff0c…

1999-2018年地级市不同所有制成分工业总产值数据

1999-2018年地级市不同所有制成分工业总产值数据 1、时间&#xff1a;1999-2018年 2、范围&#xff1a;地级市 3、指标&#xff1a;行政区划代码、城市、年份、规模以上工业企业数_全市_个、规模以上工业企业数_市辖区_个、规模以上内资企业数_全市_个、规模以上内资企业数_…

【AI视野·今日CV 计算机视觉论文速览 第250期】Wed, 20 Sep 2023

AI视野今日CS.CV 计算机视觉论文速览 Wed, 20 Sep 2023 Totally 95 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computer Vision Papers PanopticNeRF-360: Panoramic 3D-to-2D Label Transfer in Urban Scenes Authors Xiao Fu, Shangzhan Zhang, Tianrun Chen…

满足开闭原则的JDBCUtils~

我们都知道开闭原则即为对修改关闭&#xff0c;对拓展开放&#xff0c;那么对于实现既能通过DriverManager连接数据库&#xff0c;也能实现使用c3P0连接数据库&#xff0c;连接数据库的方式即为可变点&#xff0c;我们只需要使用抽象类或者接口封装可变点&#xff0c;再将可变点…

ip地址的正则表达式

ip地址的正则表达式 checkIpSpecial: {// 验证IP地址validator: function (value) {// var reg /^((1?\d?\d|(2([0-4]\d|5[0-5])))\.){3}(1?\d?\d|(2([0-4]\d|5[0-5])))$/;这个正则表达式的意思是&#xff1a;以数字开头&#xff0c;紧接着是一个点&#xff0c;然后再是数…

SecureCRT SSH与FTP连接中文乱码

1、首先要保证服务端环境变量是UTF-8编码的 LANG”zh_CN.UTF-8″ 2、会话里面配置好字符编码&#xff1a;UTF-8 SSH会话的窗口就可以正常显示中文了&#xff0c;效果如下 3、打开FTP或者SFTP时进行文件传输时&#xff0c;列表窗口里面还是乱码&#xff0c;需要把SecureCRT安…

猜数游戏 rust解法

给定答案序列和猜测序列&#xff0c;统计有几个数字位置正确&#xff0c;有几个数字在两个序列都出现过但位置不对。 输入包含多组数据。每组第一行是序列长度n&#xff0c;第二行是答案序列&#xff0c;接下来若干行是猜测序列。猜测序列全0时该组结束。n0时整个输入结束。 样…

【知识分享】Java获取当前日期是第几周且本周是几号到几号

加哥今天给大家提供一个获取当前日期是本年度的第几周的方法&#xff0c;且这周是几号到几号的工具类&#xff0c;供大家使用。 public static void main(String[] args) {//使用当前时间戳 System.currentTimeMillis()Current_week(System.currentTimeMillis()); }public st…

周界警戒AI算法+视频智能分析在安全生产场景中的应用

长期以来&#xff0c;周界防范安防系统在大型园区、工厂、社区、机场、火车站站台、重点单位等领域应用较为广泛和常见。随着AI人工智能等新兴技术的快速发展与落地应用&#xff0c;通过AI智能检测与视频智能分析技术&#xff0c;现代化的周界安防系统可以做到全天候快速、准确…

科普:什么是视频监控平台?如何应用在场景中?

随着科技的发展&#xff0c;监控无处不在&#xff0c;就像一张密不透风的网&#xff0c;将生活中的角角落落都编织在一起。可是&#xff0c;你真的知道什么是安防视频监控平台吗&#xff1f;它可不止是一个简单的通电摄像头&#xff0c;如今的视频监控平台&#xff0c;涵盖了无…