0301taildir-source报错-flume-大数据

1 基础环境简介

linux系统:centos,前置安装:jdk、hadoop、zookeeper、kafka,版本如下

软件版本描述
centos7linux系统发行版
jdk1.8java开发工具集
hadoop2.10.0大数据生态基础组件
zookeeper3.5.7分布式应用程序协调服务
kafka3.0分布式mq组件
flume1.9.0分布式采集传输组件

2 报错

  • 场景1:动态监控目录多个日志变化,通过flume采集传输到kafka

  • 报错日志

    org.apache.flume.FlumeException: Error creating positionFile parent directoriesat org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:170)at org.apache.flume.conf.Configurables.configure(Configurables.java:41)at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:750)
    Caused by: java.nio.file.FileAlreadyExistsException: /export/server/flumeat sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)at java.nio.file.Files.createDirectory(Files.java:674)at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)at java.nio.file.Files.createDirectories(Files.java:727)at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:168)... 11 more
  • conf文件如下

    #定义组件
    a1.sources = r1
    a1.channels = c1#配置source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    a1.sources.r1.positionFile = /export/server/flume/taildir_position.json#配置channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092
    a1.channels.c1.kafka.topic = topic_log01
    a1.channels.c1.parseAsFlumeEvent = false#组装 
    a1.sources.r1.channels = c1
    
  • 原因就是在创建positionFile的时候父目录已存在

  • 场景2:我们生成的日志文件app.log 每经过一天会按照日期重命名文件,然后生成新的app.log,此时flume会重新采集所有的日志信息,导致信息重复采集2次。

  • Taildir 说明: Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

    {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}{"inode":2496275,"pos":12,"file":"/opt/module/flume/files2/log.txt"}
    
  • 而flume会同时判断Inode和file来确定是否同一文件

    注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统

    用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来

    识别文件。

3 解决

场景1解决方案有两种:

  1. 既然是创建父目录已存在,我们可以吧positionFile位置重新配置。

  2. 修改源代码,我们通过源代码找下处理逻辑,下载1.9.0版本的flume源代码,官网地址:https://archive.apache.org/dist/flume/,找到TailSource 170行

     @Overridepublic synchronized void configure(Context context) {String fileGroups = context.getString(FILE_GROUPS);Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),fileGroups.split("\\s+"));Preconditions.checkState(!filePaths.isEmpty(),"Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");String homePath = System.getProperty("user.home").replace('\\', '/');positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);Path positionFile = Paths.get(positionFilePath);try {// 此处创建父目录,如果存在报错Files.createDirectories(positionFile.getParent());} catch (IOException e) {throw new FlumeException("Error creating positionFile parent directories", e);}headerTable = getTable(context, HEADERS_PREFIX);batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,DEFAULT_CACHE_PATTERN_MATCHING);backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);fileHeader = context.getBoolean(FILENAME_HEADER,DEFAULT_FILE_HEADER);fileHeaderKey = context.getString(FILENAME_HEADER_KEY,DEFAULT_FILENAME_HEADER_KEY);maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);if (maxBatchCount <= 0) {maxBatchCount = DEFAULT_MAX_BATCH_COUNT;logger.warn("Invalid maxBatchCount specified, initializing source "+ "default maxBatchCount of {}", maxBatchCount);}if (sourceCounter == null) {sourceCounter = new SourceCounter(getName());}}
    

    在这里插入图片描述

可以在创建父目录之前检测是否已存在,如果已存在,直接跳过创建即可,修改try代码块中内容如下

boolean exists = Files.exists(positionFile.getParent());if (!exists)Files.createDirectories(positionFile.getParent());

maven打包替换flume/lib/下 flume-taildir-source-1.9.0.jar 如图所示:在这里插入图片描述

重新运行,正常启动,如下图日志所示:在这里插入图片描述

kafka中新接收的数据如下图所示:在这里插入图片描述

场景2解决方案 把TailFile如下代码

  public boolean updatePos(String path, long inode, long pos) throws IOException {if (this.inode == inode && this.path.equals(path)) {setPos(pos);updateFilePos(pos);logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);return true;}return false;}// 修改为public boolean updatePos(String path, long inode, long pos) throws IOException {if (this.inode == inode) {setPos(pos);updateFilePos(pos);logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);return true;}return false;}

即不校验file只校验inode,具体这里不再去验证,有兴趣自己验证下

结语

如果小伙伴什么问题或者指教,欢迎交流。

❓QQ:806797785

参考链接:

[1]flume教学视频[CP/OL].2020-04-16.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/752966.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Docker】使用Docker部署IT运维管理平台CAT

作者怀揣着一个美好的愿景&#xff0c;旨在提升管理效率、推动开源项目的蓬勃发展。 来一杯咖啡与茶&#xff0c;为 IT 运维从业者减轻管理负担&#xff0c;提升管理效率&#xff0c;从繁重无序的工作中解压出来&#xff0c;利用剩余时间多喝一杯休息一下。 这是一个专为 IT 运…

windows安装prometheus和grafana

prometheus官网 grafana 软件下载 prometheus windows_exporter https://github.com/prometheus-community/windows_exporter grafana prometheus原理 配置prometheus 解压之后prometheus-2.50.1.windows-amd64.zip修改prometheus.yml localhost修改为127.0.0.1 双击p…

爬虫神器!使用Python一键下载网页图片,省时高效!

引言 爬虫技术在当今信息时代中扮演着重要的角色&#xff0c;可以自动化获取互联网上的数据。本教程将围绕你提供的Python爬虫代码展开&#xff0c;旨在实现自动下载图片的功能。通过这个示例&#xff0c;你将学习如何利用爬虫技术批量获取网页中的图片&#xff0c;并将其保存…

Python下有关CV的一些算法和函数

目录&#xff1a; 1. HoughCircles二级目录三级目录 1. HoughCircles 霍夫圆检测 二级目录 三级目录

Python轴承故障诊断 (17)基于TCN-CNN并行的一维故障信号识别模型

往期精彩内容&#xff1a; Python-凯斯西储大学&#xff08;CWRU&#xff09;轴承数据解读与分类处理 Python轴承故障诊断 (一)短时傅里叶变换STFT Python轴承故障诊断 (二)连续小波变换CWT_pyts 小波变换 故障-CSDN博客 Python轴承故障诊断 (三)经验模态分解EMD_轴承诊断 …

docker 安装minio,详细图解

废话不多说&#xff0c;直接上干货 docker 安装minio 拉取镜像 docker pull minio/minio创建数据目录、配置目录 mkdir /opt/minio/data mkdir /opt/minio/config启动容器 docker run -p 9000:9000 -p 9090:9090 \--name minio \-d --restartalways \-e "MINIO_ACCESS_KE…

[LLM]大语言模型文本生成—解码策略(Top-k Top-p Temperature)

{"top_k": 5,"temperature": 0.8,"num_beams": 1,"top_p": 0.75,"repetition_penalty": 1.5,"max_tokens": 30000,"message": [{"content": "你好","role": "user&…

【递归专题】【蓝桥杯备考训练】:有序分数、正则问题、带分数、约数之和、分形之城【已更新完成】

目录 1、有序分数&#xff08;usaco training 2.1&#xff09; 2、正则问题&#xff08;第八届蓝桥杯省赛C A组 & Java A组&#xff09; 3、带分数&#xff08;第四届蓝桥杯省赛Java A组/B组 & C B组/C组&#xff09; 4、约数之和&#xff08;《算法竞赛进阶指南》…

图形学 总结 - 老是忘

渲染流水线&#xff1a; 1、首先相机摆放到场景一个位置和角度&#xff0c;场景的各个物体也已经被摆放好 2、拿到场景物体的顶点信息&#xff0c;根据顶点信息构成图元 3、经过透视投影&#xff0c;将图元转化为2*2的正方形&#xff0c;再把2*2的正方形扩展到屏幕大小 4、…

修复 error Delete `␍` prettier/prettier 错误

修复 error Delete ␍ prettier/prettier 错误 问题背景报错信息报错原因解决办法修改CRLF----针对单个文件yarn run lint --fix 一键修复&#xff08;官方提供&#xff09; 问题背景 今天在使用 openapi 自动生成前端接口代码的时候&#xff0c;爆了一个类似 eslint 规范的错…

机器人路径规划:基于流场寻路算法(Flow Field Pathfinding)的机器人路径规划(提供Python代码)

流场寻路算法(Flow Field Pathfinding)是一种基于流体动力学理论的路径规划算法&#xff0c;它模拟了流体在空间中的流动&#xff0c;并利用流体的运动特性来指导路径的选择。下面是流场寻路算法的基本介绍及算法描述&#xff1a; 1. 基本介绍 流场寻路算法通过将环境划分为网…

算法导论第十二章练习参考答案(22) - 12.1-12.4

Exercise 12.1-1 任何时候&#xff0c;如果一个节点有一个子节点&#xff0c;就把它当作右子节点&#xff0c;左子节点为NIL。 Exercise 12.1-2 二叉搜索树的属性保证了左子树的所有节点都更小&#xff0c;右子树的所有节点都更大。最小堆属性只保证一般的子节点大于父节点的关…

你在测试金字塔的哪一层(上)

​在准备将软件上线到生产环境之前需要进行测试。随着软件测试方式日趋成熟&#xff0c;软件开发团队的测试也在取代大量手动测试&#xff0c;逐渐实现自动化测试。 通过自动化测试&#xff0c;开发团队可以在短短几分钟内就了解到软件是否存在问题&#xff0c;而不需要等待几天…

航空公司遭遇Play恶意家族攻击,亚信安全发布《勒索家族和勒索事件监控报告》

本周态势快速感知 本周全球共监测到勒索事件95起&#xff0c;与上周相比数量持平。 本周Play是影响最严重的勒索家族&#xff0c;Blacksuit和Ransomhub恶意家族紧随其后&#xff0c;从整体上看lockbit3.0依旧是影响最严重的勒索家族&#xff0c;需要注意防范。 本周大陆航空技…

【鸿蒙HarmonyOS开发笔记】常用组件介绍篇 —— 弹窗组件

简介 弹窗是移动应用中常见的一种用户界面元素&#xff0c;常用于显示一些重要的信息、提示用户进行操作或收集用户输入。ArkTS提供了多种内置的弹窗供开发者使用&#xff0c;除此之外还支持自定义弹窗&#xff0c;来满足各种不同的需求。 下面是所有涉及到的弹窗组件官方文档…

CSS 浮动

浮动 在标准流当中&#xff0c;元素或者标签在页面上摆放的时候会出现不如意的地方。要想解决这些问题可以采用脱离标准流的方式来进行解决这些问题&#xff0c;脱离标准流也称为脱离文档流。 脱离标准流的解决方式有三种&#xff0c;一种是浮动&#xff0c;另外一种是固定定位…

如何读懂磁滞回曲线(磁化曲线、退磁曲线、内禀曲线)

硬磁性材料&#xff0c;如钕铁硼磁钢&#xff0c;有两个显著特征&#xff0c;一是在外磁场作用下能被强烈磁化&#xff0c;另一个是磁滞&#xff0c;即撤走外磁场后硬磁材料仍保留磁化状态&#xff0c;下图为硬磁材料的磁感应强度B与磁化场强度H之间的关系曲线。 当磁场按Hs→H…

算法——贪心

「贪心的本质是选择每一阶段的局部最优&#xff0c;从而达到全局最优」 贪心无套路 1. 分发饼干 贪心策略&#xff1a; &#xff08;1&#xff09;局部最优就是大饼干喂给胃口大的&#xff0c;充分利用饼干尺寸喂饱一个&#xff0c;全局最优就是喂饱尽可能多的小孩 &#xff08…

Linux chapter1 常用命令 cp

note 1 : netstat、curl、ip、nmap、dig 这些都是常用的网络诊断工具&#xff0c;它们的全称如下&#xff1a; netstat&#xff1a;Network Statistics&#xff0c;网络统计&#xff0c;用于显示网络连接&#xff0c;路由表&#xff0c;网络接口统计等网络信息。curl&#xf…

Kali Linux 更换优质国内源

文章目录 环境说明1 Kali Linux 源简介2 Kali Linux 更换国内源 环境说明 操作系统&#xff1a;kali-linux-2024.1-installer-amd64 1 Kali Linux 源简介 所谓的 Kali Linux 源&#xff0c;你可以将它理解为软件仓库&#xff0c;系统通过它安装和更新软件&#xff1b;源的服务…