Flume 实战开发指南

Flume

文章目录

  • Flume
    • Flume介绍
    • Flume核心概念
    • Flume NG的体系结构
      • Source
      • Channel
      • Sink
    • Flume的部署类型
      • 单一流程
      • 多代理流程(多个agent顺序连接)
      • 流的合并(多个Agent的数据汇聚到同一个Agent )
      • 多路复用流(多级流)
      • load balance功能
    • Flume组件选型
      • Source
      • Channel
        • FileChannel和MemoryChannel区别
        • FileChannel优化
      • Sink
        • HDFS小文件处理
    • 案例:日志采集发送到KafkaFlume配置
      • 日志采集流程
      • 配置如下
      • Flume拦截器开发
      • Flume采集脚本批量启动停止
    • 案例:消费Kafka保存到HDFS
      • 配置如下
        • FileChannel优化
        • HDFS小文件处理
      • Flume时间戳拦截器
      • 项目经验之Flume内存优化

Flume介绍

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

  1. flume的可靠性

    当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。

  2. flume的可恢复性
    还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。

Flume核心概念

ClientClient生产数据,运行在一个独立的线程。
Event一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
FlowEvent从源点到达目的点的迁移的抽象。
Agent一个独立的Flume进程,包含组件Source、 Channel、 Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。)
Source数据收集组件。(source从Client收集数据,传递给Channel)
Channel中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接 sources 和 sinks ,这个有点像一个队列。)
Sink从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)

Flume NG的体系结构

Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是sourcechannelsink。通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示。

在这里插入图片描述

Source

Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。

Flume提供了各种source的实现,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。如果内置的Source无法满足需要, Flume还支持自定义Source。

在这里插入图片描述

Channel

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件。

Flume对于Channel,则提供了Memory Channel、JDBC Chanel、File Channel,etc。

MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。

MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。

FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

Sink

Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。

Flume Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
在这里插入图片描述

Flume的部署类型

单一流程

在这里插入图片描述

多代理流程(多个agent顺序连接)

在这里插入图片描述

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

流的合并(多个Agent的数据汇聚到同一个Agent )

在这里插入图片描述

这种情况应用的场景比较多,比如要收集Web网站的用户行为日志, Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每 个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

多路复用流(多级流)

Flume还支持多级流,什么多级流?来举个例子,当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。
在这里插入图片描述

load balance功能

下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上 。
在这里插入图片描述

Flume组件选型

Source

  1. Taildir Source相比Exec Source、Spooling Directory Source的优势?

    TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。不会丢数据,但是有可能会导致数据重复。

    Exec Source:可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

    Spooling Directory Source:监控目录,支持断点续传。

  2. batchSize大小如何设置?

    答:Event 1K左右时,500-1000合适(默认为100)

Channel

​ 采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。

​ 注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

FileChannel和MemoryChannel区别

MemoryChannel:

传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

FileChannel:

传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

选型:

金融类公司、对钱要求非常准确的公司通常会选择FileChannel

传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。

FileChannel优化

​ 通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDirbackupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据。

FileChannel原理:
在这里插入图片描述

Sink

HDFS小文件处理

HDFS存入大量小文件,有什么影响?

**元数据层面:**每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

**计算层面:**默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600hdfs.rollSize=134217728hdfs.rollCount =0几个参数综合作用,效果如下:

①文件在达到128M时会滚动生成新文件

②文件创建超3600秒时会滚动生成新文件

案例:日志采集发送到KafkaFlume配置

日志采集流程

在这里插入图片描述

配置如下

/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

> vim file-flume-kafka.conf# 为各组件命名
a1.sources = r1
a1.channels = c1# 描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/program/logs/app.*
a1.sources.r1.positionFile = /opt/program/logs/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.jast.flume.ETLInterceptor$Builder# 描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 192.168.60.14:9092,192.168.60.15:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false# 绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

注意:com.jast.flume.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

Flume拦截器开发

  1. 创建Maven工程flume-interceptor

  2. 创建包名:com.jast.flume.interceptor

  3. 在pom.xml文件中添加如下配置

    <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.19</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
    

    注意:scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。

  4. com.jast.flume.ETLInterceptor包下创建ETLInterceptor

    package com.jast.flume;import cn.hutool.json.JSONUtil;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.List;/*** @author Jast* @description 自定义拦截器*/
    public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();String text = new String(body, StandardCharsets.UTF_8);if(JSONUtil.isJson(text)){return event;}System.out.println("非json格式过滤掉");return null;}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
    }
  5. 打包

    在这里插入图片描述

  6. 自己部署的Flume:需要先将打好的包放入到flume/lib文件夹下面

    cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar flume/lib
    
  7. CDH版本Flume:需要先将打好的包放入到flume/lib文件夹下面

    具体的目录/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/

    cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib
    
  8. 启动Flume

    nohup flume-ng agent --conf-file /opt/program/flume/file-flume-kafka-1.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > /opt/program/flume/log1.txt 2>&1  &
    
  9. 测试

    随便在app.log加入一条数据,拦截器检测到,然后打印出非json格式过滤掉

    2022-03-17 09:42:42,148 INFO taildir.ReliableTaildirEventReader: Pos 9 is larger than file size! Restarting from pos 0, file: /opt/program/logs/app.log, inode: 5810713
    2022-03-17 09:42:42,148 INFO taildir.TailFile: Updated position, file: /opt/program/logs/app.log, inode: 5810713, pos: 0
    非json格式过滤掉
    2022-03-17 09:45:17,236 INFO taildir.TaildirSource: Closed file: /opt/program/logs/app.log, inode: 5810713, pos: 22
    

    输入一条json数据,在kafka消费时正常消费,配置成功

    22/03/17 09:15:41 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-3667] Resetting offset for partition topic_log-0 to offset 0.
    {"en":"小张"}
    

Flume采集脚本批量启动停止

  1. 在/home/atguigu/bin目录下创建脚本f1.sh

    > vim f1.sh#! /bin/bashcase $1 in
    "start"){for i in localhostdoecho " --------启动 $i 采集flume-------"ssh $i "nohup flume-ng agent --conf-file /opt/program/flume/file-flume-kafka-1.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > /opt/program/flume/log1.txt 2>&1  &"done
    };;
    "stop"){for i in localhostdoecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka-1.conf | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "done};;
    esac
    

    说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。

    说明2:awk 默认分隔符为空格

    说明3:$2是在“”双引号内部会被解析为脚本的第二个参数,但是这里面想表达的含义是awk的第二个值,所以需要将他转义,用$2表示。

    说明4:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。ls

  2. 增加脚本执行权限

    chmod u+x f1.sh
    
  3. f1集群启动脚本

    f1.sh start
    
  4. f1集群停止脚本

    f1.sh stop
    

案例:消费Kafka保存到HDFS

在这里插入图片描述

配置如下

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.60.15:9092,192.168.60.14:9092
a1.sources.r1.kafka.topics=topic_log
#a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/program/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/program/flume/data/behavior1/## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /jast_root/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false#控制生成的小文件
# 每隔多少秒生成一个
a1.sinks.k1.hdfs.rollInterval = 10
# 128M生成一个文件
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = GzipCodec## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

CDH 版本:Flume写入HDFS时,Flume部署的服务器需要安装HDFS Gateway

自己部署版本:Flume与Hadoop集群不在一起的话,需要配置Hadoop环境变量

FileChannel优化

​ 通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDirbackupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据。

FileChannel原理:

在这里插入图片描述

HDFS小文件处理

HDFS存入大量小文件,有什么影响?

**元数据层面:**每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

**计算层面:**默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600hdfs.rollSize=134217728hdfs.rollCount =0几个参数综合作用,效果如下:

①文件在达到128M时会滚动生成新文件

②文件创建超3600秒时会滚动生成新文件

Flume时间戳拦截器

​ 由于Flume默认会用Linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费Kafka里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。

​ 解决的思路:拦截json日志,通过fastjson框架解析json,获取实际时间ts。将获取的ts时间写入拦截器header头,header的key必须是timestamp,因为Flume框架会根据这个key的值识别为时间,写入到HDFS。

  1. 在com.jast.flume.interceptor包下创建TimeStampInterceptor类

    package com.jast.flume.interceptor;import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayList<Event> events = new ArrayList<>();@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}}
    }
    
  2. 重新打包

    在这里插入图片描述

  3. 自己部署的Flume:需要先将打好的包放入到flume/lib文件夹下面

    cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar flume/lib
    
  4. CDH版本Flume:需要先将打好的包放入到flume/lib文件夹下面

    具体的目录/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/

    cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib
    

项目经验之Flume内存优化

  1. 问题描述:如果启动消费Flume抛出如下异常

    ERROR hdfs.HDFSEventSink: process failed
    java.lang.OutOfMemoryError: GC overhead limit exceeded
    
  2. 解决方案步骤

    在服务器的flume/conf/flume-env.sh文件中增加如下配置

    export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

  3. Flume内存参数设置及优化

    JVM heap一般设置为4G或更高

    -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

    -Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。

    参考内容:
    https://www.cnblogs.com/qingyunzong/p/8994494.html
    https://flume.apache.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509546.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vim grep配置及使用

vimgrep /匹配模式/[g][j] 要搜索的文件/范围 g&#xff1a;表示是否把每一行的多个匹配结果都加入 j&#xff1a;表示是否搜索完后定位到第一个匹配位置 vimgrep /pattern/ % 在当前打开文件中查找 vimgrep /pattern/ * 在当前目录下查找所有 vimgrep…

C++ STL--stack/queue 的使用方法

1、stack stack 模板类的定义在<stack>头文件中。 stack 模板类需要两个模板参数&#xff0c;一个是元素类型&#xff0c;一个容器类型&#xff0c;但只有元素类型是必要 的&#xff0c;在不指定容器类型时&#xff0c;默认的容器类型为deque。 定义stack 对象的示例代码…

Flink Chain任务链分隔

Chain分隔 文章目录Chain分隔如何切断任务链&#xff1f;startNewChain 与 disableChaining区别全局切断任务链(chain)web端效果查看隔离后依赖链忙碌程度什么是Backpressured(被压/反压)&#xff1f;代码样例参考文献如何切断任务链&#xff1f; 由于共享slot的存在&#xff…

Vim winmanager文件浏览自动更新

使用winmanger插件中发现其中引用的fileexplorer不能自动更新到当前文件夹。 将vim/plugin/winfileexplorer.vim 中的函数FileExplorer_Start() function! FileExplorer_Start() let b:displayMode "winmanager" call s:EditDir(getcwd()) "if exists(s:lastDi…

KMP 深度讲解next数组的求解

【经典算法】——KMP&#xff0c;深入讲解next数组的求解 前言   之前对kmp算法虽然了解它的原理&#xff0c;即求出P0Pi的最大相同前后缀长度k&#xff1b;但是问题在于如何求出这个最大前后缀长度呢&#xff1f;我觉得网上很多帖子都说的不是很清楚&#xff0c;总感觉没有把…

Yarn 命令详细介绍

文章目录yarn命令根据状态查看Yarn全部运行应用程序根据提交程序代码提交类型查看运行程序yarn top 查看正在运行的状态yarn top整体资源使用查看Yarn web页面工具脚本根据yarn应用名称kill进程根据yarn应用名称查看日志yarn命令 根据状态查看Yarn全部运行应用程序 # 查看全部…

初窥Linux 之 我最常用的20条命令

玩过Linux的人都会知道&#xff0c;Linux中的命令的确是非常多&#xff0c;但是玩过Linux的人也从来不会因为Linux的命令如此之多而烦恼&#xff0c;因为我们只需要掌握我们最常用的命令就可以了。当然你也可以在使用时去找一下man&#xff0c;他会帮你解决不少的问题。然而每个…

纸牌三角形(蓝桥杯)

标题&#xff1a;纸牌三角形 A,2,3,4,5,6,7,8,9 共9张纸牌排成一个正三角形&#xff08;A按1计算&#xff09;。要求每个边的和相等。 下图就是一种排法。 A 9 6 4 8 3 7 5 2 镜像后的&#xff1a; A 6 9 8 4 2 5 7 3 这样的排法可能会有很多。 如果考虑旋转、镜像…

BackPressure详细介绍

BackPressure详细介绍 文章目录BackPressure详细介绍前言什么是反压&#xff1f;为什么需要关注反压&#xff1f;为什么不需要关注反压&#xff1f;如何发现和追踪反压的根源&#xff1f;反压的坏处经常碰到哪些问题会任务反压怎么处理反压&#xff1f;前言 Flink反压已经是老…

new/delete和malloc/free的区别一般汇总

一、基本概念 malloc/free&#xff1a; 1、函数原型及说明&#xff1a; void *malloc(long NumBytes)&#xff1a;该函数分配了NumBytes个字节&#xff0c;并返回了指向这块内存的指针。如果分配失败&#xff0c;则返回一个空指针&#xff08;NULL&#xff09;。 void free(voi…

给IT新人的15个建议:程序员的辛酸反省与总结!

很多人表面上看着老实巴交的&#xff0c;实际上内心比谁都好强、自负、虚荣、甚至阴险。工作中见的多了&#xff0c;也就习惯了。   有一些人&#xff0c;什么事都写在脸上&#xff0c;表面上经常得罪人&#xff0c;甚至让人讨厌。但是他们所表现的又未必不是真性情。 我相信…

Logback日志发送到Kafka

Logback日志发送到Kafka 文章目录Logback日志发送到Kafka一、使用logback将日志发送至kafka1.1 引入依赖1.2 logback.xml简单Demo1.3 兼容性1.4 完整的样例1.5 启动程序收集日志1.6 项目Git地址一、使用logback将日志发送至kafka 1.1 引入依赖 如果存在则跳过该步骤 pom.xml …

01背包问题(DFS解法)

有5个物体&#xff0c;每个物品只有一个,其重量分别是为2,2,6,5,4,价值分别为6,3,5,4,6,背包的载重量为10,求装入背包的物体及总质量。 计算结果&#xff1a;15 package com.lanQiaoFor6;import java.util.ArrayList; import java.util.TreeSet;public class JAVA_6 {static …

Windows下安装Vim插件管理Vundle

VIM是编辑器之神&#xff0c;这个就不用说了&#xff0c;越使用越会体会到VIM的强大与便利。但是它的强大建立在众多插件组合之上&#xff0c;而Vim本身缺乏对插件的有效管理&#xff0c;安装插件并配置_vimrc文件非常不便。gmarik受到Ruby的bunler的启发&#xff0c;开发了vun…

AOE网

博客来源&#xff1a;http://blog.csdn.net/wang379275614/article/details/13990163 认识AOE网 有向图中&#xff0c;用顶点表示活动&#xff0c;用有向边表示活动之间开始的先后顺序&#xff0c;则称这种有向图为AOV网络&#xff1b;AOV网络可以反应任务完成的先后顺序&#…

Spark foreachRDD的使用

常出现的使用误区&#xff1a; **误区一&#xff1a;**在driver上创建连接对象&#xff08;比如网络连接或数据库连接&#xff09; 如果在driver上创建连接对象&#xff0c;然后在RDD的算子函数内使用连接对象&#xff0c;那么就意味着需要将连接对象序列化后从driver传递到w…

包子凑数(蓝桥杯)

标题&#xff1a;包子凑数 小明几乎每天早晨都会在一家包子铺吃早餐。他发现这家包子铺有N种蒸笼&#xff0c;其中第i种蒸笼恰好能放Ai个包子。每种蒸笼都有非常多笼&#xff0c;可以认为是无限笼。 每当有顾客想买X个包子&#xff0c;卖包子的大叔就会迅速选出若干笼包子来&…

makefile例子(经典)

相信在unix下编程的没有不知道makefile的&#xff0c;刚开始学习unix平台 下的东西&#xff0c;了解了下makefile的制作&#xff0c;觉得有点东西可以记录下。   下面是一个极其简单的例子&#xff1a; 现在我要编译一个Hello world&#xff0c;需要如下三个文件&#xff1a;…

Scala-SparkStreaming 2.2.0 消费 kafka0.10(生产1.0)

Scala-SparkStreaming 2.2.0 kafka0.10&#xff08;生产1.0&#xff09; 文章目录Scala-SparkStreaming 2.2.0 kafka0.10&#xff08;生产1.0&#xff09;代码Pom.xmlSparkstreaming 2.1.1版本pom文件Spark 2.2 kafka0.10(api使用的0.10&#xff0c;实际生产kafka版本是1.0)代码…

数据结构前缀,后缀,中缀表达式

[cpp] view plaincopy [cpp] view plaincopy <span style"color: rgb(51, 51, 51); font-family: Arial; font-size: 14px; line-height: 26px; background-color: rgb(255, 255, 255);">举例&#xff1a;</span> (3 4) 5 - 6 就是中缀表达式 - 3…