kafka和Flume的整合

目录

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

 3、测试

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 2、测试


 

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

flume学习网站:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了 (liyifeng.org)

# 来到这个目录下
cd /opt/installs/flume/conf/myconf
# 创建一个conf文件
vi kafka-memory-logger.conf

在kafka-memory-logger.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sources.r1.kafka.topics = bigdata

a1.sources.r1.kafka.consumer.group.id = text7

a1.sources.r1.batchSize = 100

a1.sources.r1.batchDurationMillis = 2000

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sinks.k1.maxBytesToLog = 128

 3、测试

启动一个消息生产者,向topic中发送消息,启动flume,接收消息

  • 启动一个消息生产者,向topic中发送消息:
kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic bigdata

  • 启动flume,接收消息 
flume-ng agent -n a1 -c ../ -f kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

 

 

 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 在flume-kafka-sink.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = netcat

a1.sources.r1.bind = bigdata01

a1.sources.r1.port = 44444

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = bigdata

a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

 2、测试

启动:

flume-ng agent -n a1 -c ../ -f flume-kafka-sink.conf -Dflume.root.logger=INFO,console

 使用telnet命令,向端口发送消息:

yum -y install telnettelnet bigdata01 44444

 

 在窗口不断的发送文本数据,数据被抽取到了kafka中,如何获取kafka数据呢?使用消费者:

kafka-console-consumer.sh --topic bigdata --bootstrap-server bigdata01:9092

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/885860.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jmeter基础篇(22)服务器性能监测工具Nmon的使用

一、前言 我们在日常做压测的过程中,不仅仅需要监控TPS,响应时间,报错率等这些系统基础性能数据,还需要对服务器的性能(如CPU、磁盘、内存、网络IO等)做监控,以求对系统运行过程中的硬件性能有…

【c++笔试强训】(第六篇)

目录 单词搜索(搜索) 题目解析 讲解算法原理 编写代码 杨辉三⻆(动态规划) 题目解析 讲解算法原理 编写代码 单词搜索(搜索) 题目解析 1.题目链接:单词搜索_牛客题霸_牛客网 2.题目描…

【含开题报告+文档+PPT+源码】基于SpringBoot的奶茶点单系统

开题报告 随着社会经济的发展和人们对生活质量的需求提升,奶茶行业迅速崛起,并成为人们生活不可或缺的一部分。消费者在奶茶店点单通常需要排队等候、填写纸质订单,给消费者和奶茶店带来了一定的不便。因此,设计和实现一个基于 S…

【Android、IOS、Flutter、鸿蒙、ReactNative 】约束布局

Android XML 约束布局 参考 TextView居中 TextView 垂直居中并且靠右 TextView 宽高设置百分比 宽和高的比例 app:layout_constraintDimensionRatio"h,2:1" 表示子视图的宽高比为2:1,其中 h表示保持宽度不变,高度自动调整。 最大宽度 设…

Android 下内联汇编,Android Studio 汇编开发

版权归作者所有,如有转发,请注明文章出处:https://cyrus-studio.github.io/blog/ 内联汇编 Android 内联汇编非常适用于 ARM 架构的性能优化和底层操作,通常用于加密、解密、特定指令优化等领域。 1. 基础语法 内联汇编在 C/C …

安装宝塔 Windows 面板

操作场景 宝塔面板是一款使用很方便、功能强大、交互友好且终身免费的服务器管理软件,支持 Linux 与 Windows 系统。在宝塔面板中,您可以一键配置 LAMP、LNMP、网站、数据库、FTP、SSL,还可以通过 Web 端轻松管理服务器。 本文介绍如何在 W…

Ubuntu 的 ROS 操作系统 turtlebot3 gazebo仿真

引言 TurtleBot3 Gazebo仿真环境是一个非常强大的工具,能够帮助开发者在虚拟环境中测试和验证机器人算法。 Gazebo是一个开源的3D机器人仿真平台,它能支持物理引擎,允许机器人在虚拟环境中模拟和测试。结合ROS,它能提供一个完整的…

「IDE」集成开发环境专栏目录大纲

✨博客主页何曾参静谧的博客📌文章专栏「IDE」集成开发环境📚全部专栏「Win」Windows程序设计「IDE」集成开发环境「UG/NX」BlockUI集合「C/C」C/C程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「UG/NX」NX定…

Lucene 和 Elasticsearch 中更好的二进制量化 (BBQ)

作者:来自 Elastic Benjamin Trent Lucene 和 Elasticsearch 中更好的二进制量化 (BBQ)。 嵌入模型输出 float32 向量,通常对于高效处理和实际应用来说太大。Elasticsearch 支持 int8 标量量化,以减小向量大小,同时保持性能。其他…

Odoo:免费开源的钢铁冶金行业ERP管理系统

文 / 开源智造 Odoo亚太金牌服务 简介 Odoo免费开源ERP集成计质量设备大宗原料采购,备件设材全生命周期,多业务模式货控销售,全要素追溯单品,无人值守计量物流,大宗贸易交易和精细化成本管理等方案;覆盖…

Linux设置socks代理

公司里绝大多数主机已经禁止外网访问,仅保留一台主机设置socks作为代理服务器。如下为对socks这一概念的学习整理 什么是socks 是一种OSI模型下会话层的协议,位于表示层与传输层之间,作用是: exchanges network packets between…

MySQL数据库:SQL语言入门 (学习笔记)

SQL(Structured Query Language)是结构化查询语言的简称,它是一种数据库查询和程序设计语言,同时也是目前使用最广泛的关系型数据库操作语言。(95%适用于所有关系型数据库) 【 SQL是关系型数据库通用的操作…

视频会议接入GB28181视频指挥调度,语音对讲方案

传统的视频会议指挥调度系统目前主流的互联网会议大部分都是私有协议,功能都很独立。目前主流的视频监控国标都最GB平台,新的需求要求融合平台要接入监控等设备,并能实现观看监控接入会议,实时语音设备指挥现场工作人员办公实施。…

萤石设备视频接入平台EasyCVR海康私有化视频平台监控硬盘和普通硬盘有何区别?

在现代安防监控领域,对于数据存储和视频处理的需求日益增长,特别是在需要长时间、高稳定性监控的环境中,选择合适的存储设备和监控系统显得尤为重要。本文将深入探讨监控硬盘与普通硬盘的区别,并详细介绍海康私有化视频平台EasyCV…

一学就废|Python基础碎片,字符串编码

Unicode 万国码 在 Python 3 中,字符串由 Unicode 表示,而不是字节。ASCII 码是定义字符数字代码的最著名的标准。数字值最初只定义 128 个字符,因此 ASCII 只包含控制代码、数字、小写字母、大写字母等。然而,我们不足以表示世界…

npm list @types/node 命令用于列出当前项目中 @types/node 包及其依赖关系

文章目录 作用示例常用选项示例命令注意事项 1、实战举例**解决方法**1. **锁定唯一的 types/node 版本**2. **清理依赖并重新安装**3. **设置 tsconfig.json 的 types**4. **验证 Promise 类型支持** **总结** npm list types/node 命令用于列出当前项目中 types/node 包及其…

Qt--命令行终端程序开发

提示:本文为学习记录,若有错误,请联系作者,谦虚受教。 文章目录 前言一、头文件二、cpp文件三、使用流程如图所示 总结 前言 Constant dropping wears the stone. 一、头文件 #ifndef TERMINALWIDGET_H #define TERMINALWIDGET_…

【Linux】常用命令(2.6万字汇总)

文章目录 Linux常用命令汇总1. 基础知识1.1. Linux系统命令行的含义1.2. 命令的组成 2. 基础知识2.1. 关闭系统2.2. 关闭重启2.3. 帮助命令(help)2.4. 命令说明书(man)2.5. 切换用户(su)2.6.历史指令 3.目录…

video2gif容器构建指南

一、介绍 1.项目概述 Video2Gif 项目旨在提供一种便捷的方式,让用户能够将视频中的精彩片段快速转换为 GIF 动画。GIF 动画因其循环播放、文件体积小等特点,在社交媒体、聊天工具中广泛应用,用于表达情感、分享趣事等。 2.核心功能 视频导…

《人工智能网络安全现状(2024)》深度解读:机遇、挑战与应对策略

在当今数字化浪潮汹涌澎湃的时代,人工智能(AI)与网络安全已然深度交融,二者相互作用所塑造的发展态势正深刻重塑着我们的信息安全格局。《人工智能网络安全现状(2024)》这份报告恰似一盏明灯,为…