Flume均匀发送数据到kafka的partition配置UUID Interceptor生成key的坑

一、需求

Flume向kafka发送数据时,同一个flume发送到kafka的数据总是固定在某一个partition中。而业务需求是发送的数据在所有的partition平均分布

 

二、实现

Flume的官方文档:

Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.

kafka-sink是从header里的key参数的value值来hash到kafka的某个分区中。如果key为null,那么就会随机发布至分区中。事实上key为null被指定到kafka的某个固定分区。

要partition平均分布数据,就向header中写上随机的key,然后数据才会真正的向kafka分区进行随机发布。

官方文档有一个UUID Interceptor,会为每个event的head添加一个随机唯一的key,向flume添加拦截器达到随机分区发送。

在flume添加的配置文件如下:

agent1.sources.nginxlogSource.interceptors = UUIDi1
agent1.sources.nginxlogSource.interceptors.UUIDi1.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent1.sources.nginxlogSource.interceptors.UUIDi1.headerName=key
agent1.sources.nginxlogSource.interceptors.UUIDi1.preserveExisting=false

 

三、出现的问题

由于网络抖动,国外nginx机器连接不上国内的kafka集群的部分机器,nginx机器的flume通道堵塞,内存占有率高,导致Nginx机器的cpu飙升100%,持续几十台机器崩溃发送告警。

故障原因:

flume添加了UUID拦截器,UUID拦截器给Event的header添加了一个key值,flume在发送到kafka中根据key指定了固定分区。由于网络抖动,该kafka分区连接不上,分区的所有数据发送失败回滚到channel通道,失败数据还是以key指定的分区进行重新发送,发送数据一直失败回滚channel通道,直到机器崩溃故障发生。

 

四、总结

不要使用UUID拦截器进行固定的分区发送,数据量大或者网络抖动容易导致机器崩溃。应该重新编写kafkaSink,flume在发送数据的时候随机生成一个key,发送到不同的分区。就算失败回滚到channel通道也会发送到新的分区。

示例:

KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(eventTopic, UUID.randomUUID().toString(), eventBody);
messageList.add(data);

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322862.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jzoj4244-yi【贪心】

正题 题目大意 一些飞船&#xff0c;选最少的&#xff0c;使得可以载所有乘客来回两次 解题思路 因为代价都是一样的&#xff0c;选载客最多可以往返两次的就好了。 codecodecode #include<cstdio> #include<algorithm> using namespace std; const int N100010…

我心中的ASP.NET Core 新核心对象WebHost(二)

这是ASP.NET Core新核心对象系列的第二篇&#xff0c;上一篇 WebHost准备阶段 我们讲到了WebHostBuilder的初始化及配置。我们给WebHostBuilder进行以下配置 UseKestrel 设置Kestrel为HttpServer ConfigureAppConfiguration 设置了配置文件 ConfigureLogging 配置了日志处理器…

Vue及React脚手架安装

React npm i -g create-react-app create-react-app project React路由安装 yarn add react-router-dom Vue cnpm install -g vue-cli vue create project

Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)

转载自 Flume中的拦截器&#xff08;Interceptor&#xff09;介绍与使用&#xff08;一&#xff09; Flume中的拦截器&#xff08;interceptor&#xff09; 用户Source读取events发送到Sink的时候&#xff0c;在events header中加入一些有用的信息&#xff0c;或者对events的…

SOA对微服务的残余影响

近日&#xff0c;Tareq Abedrabbo在伦敦2017 Con微服务大会上说&#xff0c;SOA对微服务架构设计的残余影响仍然存在&#xff0c;包括技术选型和组织方面的问题。最直接的一个例子就是大多数企业仍然区分对待架构师和开发人员&#xff0c;架构师负责出规范&#xff0c;开发人员…

如何查看python安装路径

import sys sys.path 我的安装地址 python2: C:\Users\Tecna1205\Miniconda python3: C:\Users\Tecna1205\AppData\Local\Programs\Python\Python37 python3 -m pip install --upgrade pip --force-reinstall

jzoj4245-er【dp,贪心】

正题 题目大意 nnn个武器(n≤2n\leq2n≤2)&#xff0c;mmm个符文 符文1:直接改变一个武器的攻击力(最多一个) 符文2:增加一个武器的攻击力 符文3:使一个人的武器攻击力翻若干倍 求武器攻击力乘积最大&#xff0c;输出答案的自然对数。 解题思路 首先log(ab)log(a)log(b)lo…

Hadoop生态Flume(四)拦截器(Interceptor)介绍与使用(2)

转载自 Flume中的拦截器&#xff08;Interceptor&#xff09;介绍与使用&#xff08;二&#xff09; lume中的拦截器&#xff08;interceptor&#xff09;&#xff0c;用户Source读取events发送到Sink的时候&#xff0c;在events header中加入一些有用的信息&#xff0c;或者对…

协作更进一步:微软隆重介绍Visual Studio动态分享功能

微软刚刚在 Visual Studio Code 网站上宣布了“动态分享”&#xff08;Live Share&#xff09;功能&#xff0c;开发者们可以在 VS 2017 或 VS Code 中体验全新的实施协作。微软表示&#xff0c;Live Share 可让团队在相同的代码库上启用快速协作&#xff0c;而无需同步代码或配…

python打包exe文件

首先安装pyinstaller pip3 install pyinstaller接着导报指定文件 pyinstaller.exe -F 文件路径文件名 举例 pyinstaller.exe -F C:\Users\Tecna1205\Desktop\工作目录\Python工作目录\测试\3.3\test\tk.py 如果有图形界面&#xff0c;不想打开命令行&#xff0c;可在打包命令…

SpringBoot maven打包源码发布到仓库配置

一、项目pom.xml配置 添加发布仓库 配置上传源码 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocatio…

使用MS Test做单元测试

声明&#xff1a;本篇博客翻译自&#xff1a;http://www.c-sharpcorner.com/article/unit-testing-with-ms-tests-in-c-sharp/ 写在翻译之前&#xff1a; 依然清晰的记得刚工作的第一个项目中&#xff0c;在完成一个功能模块开发后&#xff0c;师傅让我把代码做一下单元测试。当…

jzoj4246-san【最短路,SPFA,DAGdp】

正题 题目大意 nnn个点&#xff0c;mmm条边&#xff0c;若两个点之间的任意一条最短路长度为奇数则称之为不和谐最短路。求每个点有多少条不和谐最短路经过。 解题思路 首先SPFASPFASPFA求一个多元最短路。 然后枚举起点&#xff0c;对于每个起点&#xff0c;我们只走最短路上…

Vue组件传参

父组件向子组件传参 数据&#xff1a; 父组件 <test :message"msg"></test>msg: Hello uniApp子组件&#xff08;test.vue&#xff09; <text>{{message}}</text>props:["message"]子组件向父组件传参 子组件&#xff1a; &…

SpringBoot2.1.9 多Kafka消费者配置

一、配置文件 pom.xml <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency> application.yml spring:application:name: double-kafka-consumerprofiles:active: devjacks…

如果不懂Service mesh,就不要谈微服务了

提到微服务&#xff0c;spring cloud等经典框架被使用的最为广泛&#xff0c;但是在2016年才被提起的Service Mesh&#xff0c;已经被Paypal、Lyft、Ticketmaster和Credit Karma等等一些大流量平台所使用&#xff0c;在生产应用中添加了Service mesh。今年随着Linkerd传入国内&…

欢乐纪中某B组赛【2019.1.29】

前言 Rank1Rank1Rank1耶 成绩 RankRankRank是有算别人的 RankRankRankPersonPersonPersonScoreScoreScoreAAABBBCCC1112017myself2017myself2017myself2802802801001001008080801001001003332017xjq2017xjq2017xjq2002002001001001000001001001001212122017hjq2017hjq2017hjq15…

mysq和mysqli关系

1、在php5版本之前&#xff0c;一般是用php的mysql函数去驱动mysql数据库的&#xff0c;比如mysql_query()的函数&#xff0c;属于面向过程. 2、在php5版本以后&#xff0c;增加了mysqli的函数功能&#xff0c;某种意义上讲&#xff0c;它是mysql系统函数的增强版&#xff0c;更…

SpringBoot2.1.9 分布式锁ShedLock

一、分布式锁配置 &#xff08;1&#xff09;redis锁 pom.xml <dependency><groupId>net.javacrumbs.shedlock</groupId><artifactId>shedlock-spring</artifactId><version>2.5.0</version> </dependency><dependency&…

使用AspectCore动态代理

前言 最近越来越多的同学关注到AspectCore&#xff0c;并且提出不少中肯的建议&#xff0c;其中最多的提议是希望能够看到更多的关于AspectCore使用方式的文章和Demo。那么在这篇文章里&#xff0c;我们就来聊聊AspectCore核心之一的动态代理。 动态代理 在.NET平台中&#xff…