Apache Flume

Flume 1.9.0 Developer Guide【Flume 1.9.0开发人员指南】


摘自:Flume 1.9.0 Developer Guide — Apache Flume


Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

Apache Flume是一个分布式、可靠且可用的系统,用于高效地收集、聚合来自许多不同来源的大量日志数据,并将其移动到集中的数据存储中。

Apache Flume is a top-level project at the Apache Software Foundation. There are currently two release code lines available, versions 0.9.x and 1.x. This documentation applies to the 1.x codeline. For the 0.9.x codeline, please see the Flume 0.9.x Developer Guide.

Apache Flume是Apache软件基金会的一个顶级项目。目前有两个可用的发布代码行,版本0.9.x和1.x。此文档适用于1.x代码行。有关0.9.x代码行,请参阅Flume 0.9.x开发人员指南。


Data flow model【数据流模型】

An Event is a unit of data that flows through a Flume agent. The Event flows from Source to Channel to Sink, and is represented by an implementation of the Event interface. An Event carries a payload (byte array) that is accompanied by an optional set of headers (string attributes). A Flume agent is a process (JVM) that hosts the components that allow Events to flow from an external source to a external destination.


A Source consumes Events having a specific format, and those Events are delivered to the Source by an external source like a web server. For example, an AvroSource can be used to receive Avro Events from clients or from other Flume agents in the flow. When a Source receives an Event, it stores it into one or more Channels. The Channel is a passive store that holds the Event until that Event is consumed by a Sink. One type of Channel available in Flume is the FileChannel which uses the local filesystem as its backing store. A Sink is responsible for removing an Event from the Channel and putting it into an external repository like HDFS (in the case of an HDFSEventSink) or forwarding it to the Source at the next hop of the flow. The Source and Sink within the given agent run asynchronously with the Events staged in the Channel.



An Event is staged in a Flume agent’s Channel. Then it’s the Sink‘s responsibility to deliver the Event to the next agent or terminal repository (like HDFS) in the flow. The Sink removes an Event from the Channel only after the Event is stored into the Channel of the next agent or stored in the terminal repository. This is how the single-hop message delivery semantics in Flume provide end-to-end reliability of the flow. Flume uses a transactional approach to guarantee the reliable delivery of the Events.


The Sources and Sinks encapsulate the storage/retrieval of the Events in a Transaction provided by the Channel. This ensures that the set of Events are reliably passed from point to point in the flow. In the case of a multi-hop flow, the Sink from the previous hop and the Source of the next hop both have their Transactions open to ensure that the Event data is safely stored in the Channel of the next hop.


Building Flume 【建筑水槽】

Getting the source 【获取源】

Check-out the code using Git. Click here for the git repository root.

The Flume 1.x development happens under the branch “trunk” so this command line can be used:

git clone GitHub - apache/flume: Mirror of Apache Flume




Compile/test Flume  【编译/测试Flume】

The Flume build is mavenized. You can compile Flume using the standard Maven commands:


  1. Compile only: mvn clean compile
  2. Compile and run unit tests: mvn clean test
  3. Run individual test(s): mvn clean test -Dtest=<Test1>,<Test2>,... -DfailIfNoTests=false
  4. Create tarball package: mvn clean install
  5. Create tarball package (skip unit tests): mvn clean install -DskipTests

1.仅编译:mvn clean Compile

2.编译并运行单元测试:mvn clean测试


mvn clean test-Dtest=<Test1>,<Test2>,。。。-DfailIfNoTests=false

4.创建tarball包:mvn clean install


Please note that Flume builds requires that the Google Protocol Buffers compiler be in the path. You can download and install it by following the instructions here.

请注意,Flume构建要求Google Protocol Buffers编译器位于路径中。您可以按照此处的说明下载并安装它。

Updating Protocol Buffer Version 【正在更新协议缓冲区版本】

File channel has a dependency on Protocol Buffer. When updating the version of Protocol Buffer used by Flume, it is necessary to regenerate the data access classes using the protoc compiler that is part of Protocol Buffer as follows.

文件通道依赖于协议缓冲区。当更新Flume使用的Protocol Buffer版本时,有必要使用Protocol Buffer的协议编译器重新生成数据访问类,如下所示。

  1. Install the desired version of Protocol Buffer on your local machine
  2. Update version of Protocol Buffer in pom.xml
  3. Generate new Protocol Buffer data access classes in Flume: cd flume-ng-channels/flume-file-channel; mvn -P compile-proto clean package -DskipTests
  4. Add Apache license header to any of the generated files that are missing it
  5. Rebuild and test Flume: cd ../..; mvn clean install


2.更新pom.xml中的Protocol Buffer版本

3.在Flume中生成新的Protocol Buffer数据访问类:cd Flume ng channels/Flume file channel;mvn-P编译proto clean包-DskipTests



Developing custom components 【开发自定义组件】

Client 【客户端】

The client operates at the point of origin of events and delivers them to a Flume agent. Clients typically operate in the process space of the application they are consuming data from. Flume currently supports Avro, log4j, syslog, and Http POST (with a JSON body) as ways to transfer data from a external source. Additionally, there’s an ExecSource that can consume the output of a local process as input to Flume.


The client operates at the point of origin of events and delivers them to a Flume agent. Clients typically operate in the process space of the application they are consuming data from. Flume currently supports Avro, log4j, syslog, and Http POST (with a JSON body) as ways to transfer data from a external source. Additionally, there’s an ExecSource that can consume the output of a local process as input to Flume.

有一个用例中这些现有选项是不够的,这是很可能的。在这种情况下,您可以构建一个自定义机制来向Flume发送数据。实现这一点有两种方法。第一个选项是创建一个自定义客户端,该客户端与Flume现有的源之一(如AvroSource或SyslogTcpSource)通信。在这里,客户端应该将其数据转换为这些Flume源可以理解的消息。另一种选择是编写一个自定义Flume Source,它使用某些IPC或RPC协议直接与现有的客户端应用程序进行对话,然后将客户端数据转换为Flume Events以发送到下游。请注意,Flume代理的通道中存储的所有事件都必须作为Flume事件存在。

Client SDK  【客户端SDK】

Though Flume contains a number of built-in mechanisms (i.e. Sources) to ingest data, often one wants the ability to communicate with Flume directly from a custom application. The Flume Client SDK is a library that enables applications to connect to Flume and send data into Flume’s data flow over RPC.


RPC client interface 【RPC客户端接口】

An implementation of Flume’s RpcClient interface encapsulates the RPC mechanism supported by Flume. The user’s application can simply call the Flume Client SDK’s append(Event) or appendBatch(List<Event>) to send data and not worry about the underlying message exchange details. The user can provide the required Event arg by either directly implementing the Event interface, by using a convenience implementation such as the SimpleEvent class, or by using EventBuilder‘s overloaded withBody() static helper methods.

Flume的RpcClient接口的实现封装了Flume支持的RPC机制。用户的应用程序可以简单地调用Flume Client SDK的append(Event)或appendBatch(List<Event>)来发送数据,而不必担心底层的消息交换细节。用户可以通过直接实现Event接口、使用SimpleEvent类等方便的实现或使用EventBuilder的重载withBody()静态辅助方法来提供所需的Event arg。

RPC clients - Avro and Thrift 【RPC客户端-Avro和Thrift】

As of Flume 1.4.0, Avro is the default RPC protocol. The NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface. The client needs to create this object with the host and port of the target Flume agent, and can then use the RpcClient to send data into the agent. The following example shows how to use the Flume Client SDK API within a user’s data-generating application:

从Flume 1.4.0开始,Avro是默认的RPC协议。NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口。客户端需要使用目标Flume代理的主机和端口创建此对象,然后可以使用RpcClient将数据发送到代理中。以下示例显示如何在用户的数据生成应用程序中使用Flume Client SDK API:

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class MyApp {public static void main(String[] args) {MyRpcClientFacade client = new MyRpcClientFacade();// Initialize client with the remote Flume agent's host and port// 使用远程Flume代理的主机和端口初始化客户端client.init("host.example.org", 41414);// Send 10 events to the remote Flume agent. That agent should be// configured to listen with an AvroSource.// 向远程Flume代理发送10个事件。应将该代理配置为使用AvroSource进行侦听。String sampleData = "Hello Flume!";for (int i = 0; i < 10; i++) {client.sendDataToFlume(sampleData);}client.cleanUp();}}
class MyRpcClientFacade {private RpcClient client;private String hostname;private int port;public void init(String hostname, int port) {// Setup the RPC connection   设置RPC连接this.hostname = hostname;this.port = port;this.client = RpcClientFactory.getDefaultInstance(hostname, port);// Use the following method to create a thrift client (instead of the above line):// 使用以下方法创建旧款客户端(而不是上面的行):// this.client = RpcClientFactory.getThriftInstance(hostname, port);// this.client=RpcClientFactory.getThriftInstance(主机名,端口);}public void sendDataToFlume(String data) {// Create a Flume Event object that encapsulates the sample data// 创建一个Flume Event对象,用于封装示例数据Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));// Send the event// 发送事件try {client.append(event);} catch (EventDeliveryException e) {// clean up and recreate the client// 清理并重新创建客户端client.close();client = null;client = RpcClientFactory.getDefaultInstance(hostname, port);// Use the following method to create a thrift client (instead of the above line):// 使用以下方法创建旧款客户端(而不是上面的行):// this.client = RpcClientFactory.getThriftInstance(hostname, port);// this.client=RpcClientFactory.getThriftInstance(主机名,端口);}}public void cleanUp() {// Close the RPC connection// 关闭RPC连接client.close();}

The remote Flume agent needs to have an AvroSource (or a ThriftSource if you are using a Thrift client) listening on some port. Below is an example Flume agent configuration that’s waiting for a connection from MyApp:


a1.channels = c1
a1.sources = r1
a1.sinks = k1a1.channels.c1.type = memorya1.sources.r1.channels = c1
a1.sources.r1.type = avro
# For using a thrift source set the following instead of the above line.
# a1.source.r1.type = thriftce
a1.sources.r1.bind =
a1.sources.r1.port = 41414a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

For more flexibility, the default Flume client implementations (NettyAvroRpcClient and ThriftRpcClient) can be configured with these properties:


client.type = default (for avro) or thrift (for thrift)hosts = h1                           # default client accepts only 1 host 默认客户端只接受1台主机# (additional hosts will be ignored)(将忽略其他主机)hosts.h1 = host1.example.org:41414   # host and port must both be specified 必须同时指定主机和端口# (neither has a default) (两者都没有默认值)batch-size = 100                     # Must be >=1 (default: 100) 必须>=1(默认值:100)connect-timeout = 20000              # Must be >=1000 (default: 20000) 必须>=1000(默认值:20000)request-timeout = 20000              # Must be >=1000 (default: 20000) 必须>=1000(默认值:20000)
Secure RPC client - Thrift  【安全RPC客户端-Thrift】

As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication. The client needs to use the getThriftInstance method of SecureRpcClientFactory to get hold of a SecureThriftRpcClientSecureThriftRpcClient extends ThriftRpcClient which implements the RpcClient interface. The kerberos authentication module resides in flume-ng-auth module which is required in classpath, when using the SecureRpcClientFactory. Both the client principal and the client keytab should be passed in as parameters through the properties and they reflect the credentials of the client to authenticate against the kerberos KDC. In addition, the server principal of the destination Thrift source to which this client is connecting to, should also be provided. The following example shows how to use the SecureRpcClientFactory within a user’s data-generating application:

从Flume 1.6.0开始,Thrift源和汇支持基于kerberos的身份验证。客户端需要使用SecureRpcClientFactory的getThriftInstance方法来获取SecureThriftRpcClient。SecureStriftRpcClient扩展了实现RpcClient接口的ThriftRpcClient。kerberos身份验证模块位于flume ng auth模块中,当使用SecureRpcClientFactory时,该模块在类路径中是必需的。客户端主体和客户端密钥选项卡都应该作为参数通过属性传递,它们反映了客户端根据kerberos KDC进行身份验证的凭据。此外,还应提供此客户端连接到的目标Thrift源的服务器主体。以下示例显示如何在用户的数据生成应用程序中使用SecureRpcClientFactory:

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.api.SecureRpcClientFactory;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClient;
import java.nio.charset.Charset;
import java.util.Properties;public class MyApp {public static void main(String[] args) {MySecureRpcClientFacade client = new MySecureRpcClientFacade();// Initialize client with the remote Flume agent's host, port// 使用远程Flume代理的主机、端口初始化客户端Properties props = new Properties();props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");props.setProperty("hosts", "h1");props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));// Initialize client with the kerberos authentication related properties// 使用kerberos身份验证相关属性初始化客户端props.setProperty("kerberos", "true");props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");props.setProperty("client-keytab", "/tmp/flumeclient.keytab");props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");client.init(props);// Send 10 events to the remote Flume agent. That agent should be// configured to listen with an AvroSource.// 向远程Flume代理发送10个事件。应将该代理配置为使用AvroSource进行侦听。String sampleData = "Hello Flume!";for (int i = 0; i < 10; i++) {client.sendDataToFlume(sampleData);}client.cleanUp();}
}class MySecureRpcClientFacade {private RpcClient client;private Properties properties;public void init(Properties properties) {// Setup the RPC connection// 设置RPC连接this.properties = properties;// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory// 使用SecureRpcClientFactory创建ThriftSecureRpcClient实例this.client = SecureRpcClientFactory.getThriftInstance(properties);}public void sendDataToFlume(String data) {// Create a Flume Event object that encapsulates the sample data// 创建一个Flume Event对象,用于封装示例数据Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));// Send the event// 发送事件try {client.append(event);} catch (EventDeliveryException e) {// clean up and recreate the client// 清理并重新创建客户端client.close();client = null;client = SecureRpcClientFactory.getThriftInstance(properties);}}public void cleanUp() {// Close the RPC connection// 关闭RPC连接client.close();}

The remote ThriftSource should be started in kerberos mode. Below is an example Flume agent configuration that’s waiting for a connection from MyApp:


a1.channels = c1
a1.sources = r1
a1.sinks = k1a1.channels.c1.type = memorya1.sources.r1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.bind =
a1.sources.r1.port = 41414
a1.sources.r1.kerberos = true
a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
a1.sources.r1.agent-keytab = /tmp/flume.keytaba1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
Failover Client  【故障转移客户端】

This class wraps the default Avro RPC client to provide failover handling capability to clients. This takes a whitespace-separated list of <host>:<port> representing the Flume agents that make-up a failover group. The Failover RPC Client currently does not support thrift. If there’s a communication error with the currently selected host (i.e. agent) agent, then the failover client automatically fails-over to the next host in the list. For example:

此类包装默认的Avro RPC客户端,为客户端提供故障转移处理功能。这采用了一个以空格分隔的<host>:<port>列表,表示组成故障转移组的Flume代理。故障转移RPC客户端当前不支持节俭。如果当前选择的主机(即代理)代理发生通信错误,则故障转移客户端会自动故障转移到列表中的下一台主机。例如:

// Setup properties for the failover
// 故障转移的设置属性
Properties props = new Properties();
props.put("client.type", "default_failover");// List of hosts (space-separated list of user-chosen host aliases)
// 主机列表(用户选择的主机别名的空格分隔列表)
props.put("hosts", "h1 h2 h3");// host/port pair for each host alias
// 每个主机别名的主机/端口对
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);// create the client with failover properties
// 创建具有故障转移属性的客户端
RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the failover Flume client implementation (FailoverRpcClient) can be configured with these properties:


client.type = default_failoverhosts = h1 h2 h3                     # at least one is required, but 2 or# more makes better sense# 至少需要一个,但2个或更多更有意义hosts.h1 = host1.example.org:41414hosts.h2 = host2.example.org:41414hosts.h3 = host3.example.org:41414max-attempts = 3                     # Must be >=0 (default: number of hosts# specified, 3 in this case). A '0'# value doesn't make much sense because# it will just cause an append call to# immmediately fail. A '1' value means# that the failover client will try only# once to send the Event, and if it# fails then there will be no failover# to a second client, so this value# causes the failover client to# degenerate into just a default client.# It makes sense to set this value to at# least the number of hosts that you# specified.# 必须>=0(默认值:指定的主机数,在本例中为3)。
# “0”值没有多大意义,因为它只会导致追加调用立即失败。“1”值表示故障转移客户端将只尝试发送一次事件,
# 如果失败,则不会向第二个客户端进行故障转移,因此此值会导致故障转移客户端退化为默认客户端。
# 将此值设置为至少指定的主机数是有意义的。batch-size = 100                     # Must be >=1 (default: 100)  必须>=1(默认值:100)connect-timeout = 20000              # Must be >=1000 (default: 20000)   必须>=1000(默认值:20000)request-timeout = 20000              # Must be >=1000 (default: 20000)   必须>=1000(默认值:20000)
LoadBalancing RPC client 【负载平衡RPC客户端】

The Flume Client SDK also supports an RpcClient which load-balances among multiple hosts. This type of client takes a whitespace-separated list of <host>:<port> representing the Flume agents that make-up a load-balancing group. This client can be configured with a load balancing strategy that either randomly selects one of the configured hosts, or selects a host in a round-robin fashion. You can also specify your own custom class that implements the LoadBalancingRpcClient$HostSelector interface so that a custom selection order is used. In that case, the FQCN of the custom class needs to be specified as the value of the host-selector property. The LoadBalancing RPC Client currently does not support thrift.

Flume Client SDK还支持RpcClient,它可以在多个主机之间实现负载平衡。这种类型的客户端采用以空格分隔的<host>:<port>列表,表示组成负载平衡组的Flume代理。该客户端可以使用负载平衡策略进行配置,该策略可以随机选择配置的主机之一,也可以以循环方式选择主机。您还可以指定自己的自定义类来实现LoadBalancingRpcClient$HostSelector接口,以便使用自定义选择顺序。在这种情况下,需要将自定义类的FQCN指定为主机选择器属性的值。LoadBalancing RPC客户端当前不支持节俭。

If backoff is enabled then the client will temporarily blacklist hosts that fail, causing them to be excluded from being selected as a failover host until a given timeout. When the timeout elapses, if the host is still unresponsive then this is considered a sequential failure, and the timeout is increased exponentially to avoid potentially getting stuck in long waits on unresponsive hosts.


The maximum backoff time can be configured by setting maxBackoff (in milliseconds). The maxBackoff default is 30 seconds (specified in the OrderSelector class that’s the superclass of both load balancing strategies). The backoff timeout will increase exponentially with each sequential failure up to the maximum possible backoff timeout. The maximum possible backoff is limited to 65536 seconds (about 18.2 hours). For example:


// Setup properties for the load balancing  
// 设置负载平衡的属性
Properties props = new Properties();
props.put("client.type", "default_loadbalance");// List of hosts (space-separated list of user-chosen host aliases)
// 主机列表(用户选择的主机别名的空格分隔列表)
props.put("hosts", "h1 h2 h3");// host/port pair for each host alias
// 每个主机别名的主机/端口对
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);props.put("host-selector", "random");     
// For random host selection                   用于随机主机选择
// props.put("host-selector", "round_robin");  props.put(“主机选择器”、“round_robin”);
// For round-robin host                        对于循环主机
// selection                                   选择
props.put("backoff", "true");      // Disabled by default.  默认情况下已禁用。props.put("maxBackoff", "10000");  
// Defaults 0, which effectively becomes 30000 ms    默认值为0,实际变为30000毫秒// Create the client with load balancing properties  创建具有负载平衡属性的客户端
RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the load-balancing Flume client implementation (LoadBalancingRpcClient) can be configured with these properties:


client.type = default_loadbalancehosts = h1 h2 h3                     # At least 2 hosts are required   至少需要2台主机hosts.h1 = host1.example.org:41414hosts.h2 = host2.example.org:41414hosts.h3 = host3.example.org:41414backoff = false                      # Specifies whether the client should# back-off from (i.e. temporarily# blacklist) a failed host# (default: false).maxBackoff = 0                       # Max timeout in millis that a will# remain inactive due to a previous# failure with that host (default: 0,# which effectively becomes 30000)
# 指定客户端是否应退出(即暂时列入黑名单)故障主机(默认值:false)。host-selector = round_robin          # The host selection strategy used# when load-balancing among hosts# (default: round_robin).# Other values are include "random"# or the FQCN of a custom class# that implements# LoadBalancingRpcClient$HostSelector
# 在主机之间进行负载平衡时使用的主机选择策略(默认值:round_robin)。其他值包括“random”或实现LoadBalancingRpcClient$HostSelector的自定义类的FQCNbatch-size = 100                     # Must be >=1 (default: 100)       必须>=1(默认值:100)connect-timeout = 20000              # Must be >=1000 (default: 20000)  必须>=1000(默认值:20000)request-timeout = 20000              # Must be >=1000 (default: 20000)  必须>=1000(默认值:20000)
Embedded agent 【嵌入式代理】

Flume has an embedded agent api which allows users to embed an agent in their application. This agent is meant to be lightweight and as such not all sources, sinks, and channels are allowed. Specifically the source used is a special embedded source and events should be send to the source via the put, putAll methods on the EmbeddedAgent object. Only File Channel and Memory Channel are allowed as channels while Avro Sink is the only supported sink. Interceptors are also supported by the embedded agent.


Note: The embedded agent has a dependency on hadoop-core.jar.


Configuration of an Embedded Agent is similar to configuration of a full Agent. The following is an exhaustive list of configration options:


Required properties are in bold.


Property Name





The only available source is the embedded source.




Either memory or file which correspond to MemoryChannel and FileChannel respectively.




Configuration options for the channel type requested, see MemoryChannel or FileChannel user guide for an exhaustive list.




List of sink names




Property name must match a name in the list of sinks. Value must be avro



Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port.




Either failover or load_balance which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.




Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list.

所选接收器处理器的配置选项。有关详细列表,请参阅FailoverSinksProcessor and LoadBalancingSinkProcessor用户指南。



Space-separated list of interceptors




Space-separated list of interceptors


Below is an example of how to use the agent:


Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port",  "5565");
properties.put("processor.type", "load_balance");
properties.put("source.interceptors", "i1");
properties.put("source.interceptors.i1.type", "static");
properties.put("source.interceptors.i1.key", "key1");
properties.put("source.interceptors.i1.value", "value1");EmbeddedAgent agent = new EmbeddedAgent("myagent");agent.configure(properties);
agent.start();List<Event> events = Lists.newArrayList();events.add(event);
Transaction interface  【交易接口】

The Transaction interface is the basis of reliability for Flume. All the major components (ie. Sources, Sinks and Channels) must use a Flume Transaction.


A Transaction is implemented within a Channel implementation. Each Source and Sink that is connected to a Channel must obtain a Transaction object. The Sources use a ChannelProcessor to manage the Transactions, the Sinks manage them explicitly via their configured Channel. The operation to stage an Event (put it into a Channel) or extract an Event (take it out of a Channel) is done inside an active Transaction. For example:


Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
try {// This try clause includes whatever Channel operations you want to do// 此try子句包括您想要执行的任何Channel操作Event eventToStage = EventBuilder.withBody("Hello Flume!",Charset.forName("UTF-8"));ch.put(eventToStage);// Event takenEvent = ch.take();  事件takenEvent=ch.take();// ...txn.commit();
} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as needed// 记录异常,根据需要处理个别异常// re-throw all Errors  重新抛出所有错误if (t instanceof Error) {throw (Error)t;}
} finally {txn.close();

Here we get hold of a Transaction from a Channel. After begin() returns, the Transaction is now active/open and the Event is then put into the Channel. If the put is successful, then the Transaction is committed and closed.


Sink 【输出】

The purpose of a Sink to extract Events from the Channel and forward them to the next Flume Agent in the flow or store them in an external repository. A Sink is associated with exactly one Channels, as configured in the Flume properties file. There’s one SinkRunner instance associated with every configured Sink, and when the Flume framework calls SinkRunner.start(), a new thread is created to drive the Sink (using SinkRunner.PollingRunner as the thread’s Runnable). This thread manages the Sink’s lifecycle. The Sink needs to implement the start() and stop() methods that are part of the LifecycleAware interface. The Sink.start() method should initialize the Sink and bring it to a state where it can forward the Events to its next destination. The Sink.process() method should do the core processing of extracting the Event from the Channel and forwarding it. The Sink.stop() method should do the necessary cleanup (e.g. releasing resources). The Sink implementation also needs to implement the Configurable interface for processing its own configuration settings. For example:


public class MySink extends AbstractSink implements Configurable {private String myProp;@Overridepublic void configure(Context context) {String myProp = context.getString("myProp", "defaultValue");// Process the myProp value (e.g. validation) // 处理myProp值(例如验证)// Store myProp for later retrieval by process() method // 存储myProp以便稍后通过process()方法检索this.myProp = myProp;}@Overridepublic void start() {// Initialize the connection to the external repository (e.g. HDFS) that// this Sink will forward Events to ..// 初始化到此接收器将事件转发到的外部存储库(例如HDFS)的连接。。}@Overridepublic void stop () {// Disconnect from the external respository and do any// additional cleanup (e.g. releasing resources or nulling-out// field values) ..// 断开与外部存储的连接,并进行任何额外的清理(例如释放资源或清空字段值)。。}@Overridepublic Status process() throws EventDeliveryException {Status status = null;// Start transaction   开始交易Channel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();try {// This try clause includes whatever Channel operations you want to do// 此try子句包括您想要执行的任何Channel操作Event event = ch.take();// Send the Event to the external repository. 将事件发送到外部存储库。// storeSomeData(e);txn.commit();status = Status.READY;} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as needed // 记录异常,根据需要处理个别异常status = Status.BACKOFF;// re-throw all Errors  重新抛出所有错误if (t instanceof Error) {throw (Error)t;}}return status;}
Source 【来源】

The purpose of a Source is to receive data from an external client and store it into the configured Channels. A Source can get an instance of its own ChannelProcessor to process an Event, commited within a Channel local transaction, in serial. In the case of an exception, required Channels will propagate the exception, all Channels will rollback their transaction, but events processed previously on other Channels will remain committed.


Similar to the SinkRunner.PollingRunner Runnable, there’s a PollingRunner Runnable that executes on a thread created when the Flume framework calls PollableSourceRunner.start(). Each configured PollableSource is associated with its own thread that runs a PollingRunner. This thread manages the PollableSource’s lifecycle, such as starting and stopping. A PollableSource implementation must implement the start() and stop() methods that are declared in the LifecycleAware interface. The runner of a PollableSource invokes that Source‘s process() method. The process() method should check for new data and store it into the Channel as Flume Events.

类似于SinkRunner。PollingRunner Runnable,在Flume框架调用PollableSourceRunner.start()时创建的线程上执行一个PollingRunnerRunnable。每个配置的PollableSource都与自己的线程关联,该线程运行一个Polling Runner。该线程管理PollableSource的生命周期,例如启动和停止。PolableSource实现必须实现在LifecycleAware接口中声明的start()和stop()方法。PollableSource的运行程序调用该Source的process()方法。process()方法应该检查新数据,并将其作为FlumeEvents存储到Channel中。

Note that there are actually two types of Sources. The PollableSource was already mentioned. The other is the EventDrivenSource. The EventDrivenSource, unlike the PollableSource, must have its own callback mechanism that captures the new data and stores it into the Channel. The EventDrivenSources are not each driven by their own thread like the PollableSources are. Below is an example of a custom PollableSource:


public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp;@Overridepublic void configure(Context context) {String myProp = context.getString("myProp", "defaultValue");// Process the myProp value (e.g. validation, convert to another type, ...)// 处理myProp值(例如验证、转换为其他类型等)// Store myProp for later retrieval by process() method// 存储myProp以便稍后通过process()方法检索this.myProp = myProp;}@Overridepublic void start() {// Initialize the connection to the external client// 初始化与外部客户端的连接}@Overridepublic void stop () {// Disconnect from external client and do any additional cleanup// (e.g. releasing resources or nulling-out field values) ..// 断开与外部客户端的连接并进行任何额外的清理(例如释放资源或清空字段值)。。}@Overridepublic Status process() throws EventDeliveryException {Status status = null;try {// This try clause includes whatever Channel/Event operations you want to do// 此try子句包括要执行的任何通道/事件操作// Receive new data  接收新数据Event e = getSomeData();// Store the Event into this Source's associated Channel(s)// 将事件存储到此源的关联通道中getChannelProcessor().processEvent(e);status = Status.READY;} catch (Throwable t) {// Log exception, handle individual exceptions as needed// 记录异常,根据需要处理个别异常status = Status.BACKOFF;// re-throw all Errors  重新抛出所有错误if (t instanceof Error) {throw (Error)t;}} finally {txn.close();}return status;}
Channel  【渠道】






今日分享的内容是php8中的构造方法和析构方法&#xff0c;我们把构造方法和析构方法这两个方法分开来讲&#xff1a; 1、构造方法 构造方法存在于每个声明的类中&#xff0c;主要作用是执行一些初始化任务。如果类中没有直接声明构造方法&#xff0c;那么类会默认地生成一个没…



2023-9-29 JZ27 二叉树的镜像

题目链接&#xff1a;二叉树的镜像 import java.util.*;/** public class TreeNode {* int val 0;* TreeNode left null;* TreeNode right null;* public TreeNode(int val) {* this.val val;* }* }*/public class Solution {/*** 代码中的类名、方法名、参数…


我认为mitmproxy最强大的地方&#xff0c;就是mitmdump可以结合python代码&#xff0c;灵活拦截和处理数据包。 首先&#xff0c;mitmdump的路径如下&#xff1a;&#xff08;使用pip3 install mitmproxy安装的情况&#xff0c;参考我的文章python使用mitmproxy和mitmdump抓包…

vscode vue html 快捷键

css文件 选择多行 按下ctrl不放 按下鼠标滚轮不放&#xff08;鼠标中键&#xff09; 鼠标向下移动 同时修改多个相同的字符串 <style> .base-goods-item li {width: 304px;height: 404px;background-color: #eef9f4; } .base-goods-item li {display: block; } .base-…


题目 给定节点数为 n 的二叉树的前序遍历和中序遍历结果&#xff0c;请重建出该二叉树并返回它的头结点。 例如输入前序遍历序列{1,2,4,7,3,5,6,8}和中序遍历序列{4,7,2,1,5,3,8,6}&#xff0c;则重建出如下图所示。 提示: 1.vin.length pre.length 2.pre 和 vin 均无重复…


&#x1f4a7; 二本同学的秋招那些事儿 \color{#FF1493}{二本同学的秋招那些事儿} 二本同学的秋招那些事儿&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &#x1f984; 个人主页——微风撞见云的博客&#x1f390; &#x1f433; 《数据结构与算…


文章目录 5、 有监督微调5.1、提示学习&语境学习5.2、高效微调5.3、模型上下文窗口扩展5.4、指令数据构建5.5、开源指令数据集 5、 有监督微调 5.1、提示学习&语境学习 提示学习 完成预测的三个阶段&#xff1a;提示添加、答案搜索、答案映射 提示添加 “[X] 我感到…


接上一篇文章http://t.csdn.cn/1ONDq&#xff0c;这次我们继续讲解关于动态内存的相关知识。 一、常见的动态内存错误 1.对NULL指针进行解引用操作 #include<stdio.h> #include<stdlib.h> #include<limits.h> int main() {int* p (int*)malloc(INT_MAX/4);…

Leetcode198. 打家劫舍

https://leetcode.cn/problems/house-robber/description/ 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&…

ubuntu18.04 OpenGL开发(显示YUV)

源码参考&#xff1a;https://download.csdn.net/download/weixin_55163060/88382816 安装opengl库 sudo apt install libglu1-mesa-dev freeglut3-dev mesa-common-dev 安装opengl工具包 sudo apt install mesa-utils 检查opengl版本信息&#xff08;桌面终端执行&#xff09…


一、CEC2013测试函数 CEC2013&#xff0c;该测试集合也是目前高质量论文应用较广泛的测试集&#xff0c;CEC2013测试集函数复杂&#xff0c;非常具有挑战力。 二、多种智能优化算法求解CEC2013 2.1 本文参与求解CEC2013的智能优化算法 本文选取一些经典的智能优化算法参与测…

喜讯 | 怿星科技获评SAE“优秀核心零部件企业”,测试软件平台工具广受赞誉

2023年9月22日-23日&#xff0c;SAE 2023汽车智能与网联技术国际学术会议成功举行。此次学术会议由SAE International与南昌智能新能源汽车研究院联合主办&#xff0c;大会汇聚了来自国内外智能网联领域的顶尖专家和学者。大会同期颁布的奖项旨在向行业推选出更多新时代涌现的杰…


一年一度的国庆节来了,祝大家节日快乐,本文教大家用Python绘制国庆专属头像。 文章目录 一、效果图二、实现代码一、效果图 这是把微信头像和红旗相结合制作出来的效果图:       如需图片和代码进行练习,可到公众号中发送“国庆头像”即可免费获取 二、实现代码 具体实…


在JavaScript中&#xff0c;事件是交互式网页开发中的关键概念之一。了解事件冒泡和事件捕获是成为一名优秀的前端开发者所必需的技能之一。本文将深入探讨这两个概念&#xff0c;解释它们是如何工作的&#xff0c;以及如何在实际应用中使用它们来处理事件。 一.什么是事件冒泡…


编程世界就像一座大城市&#xff0c;前端开发和后端开发就像城市的两个不同街区。在这两个街区&#xff0c;前端和后端开发都有自己的价值和机会。 一、引言 有些人更喜欢在前端创造令人印象深刻的用户界面&#xff0c;而有些人更喜欢处理数据和系统逻辑。在选择时&#xff…

【VUE复习·4】计算属性computed:原理、完整写法(不常用)、与 methods 的区别、简写(最常用)、应用案例!

总览 1.简介计算属性 2.computed 与 methods 的区别 3.computed 的简写&#xff08;不修改计算属性&#xff0c;只显示&#xff09; 4.经典应用场景 一、计算属性 1.为什么需要计算属性&#xff1f; 首先&#xff0c;如果我们要写一个插值语法&#xff0c;而 {{ }} 内的内容…

idea 如何在命令行快速打开项目

背景 在命令行中从git仓库检出项目&#xff0c;如何在该命令行下快速用idea 打开当前项目&#xff0c;类似vscode 可以通过在项目根目录下执行 code . 快速打开当前项目。 步骤 以macos 为例 vim /usr/local/bin/idea 输入如下内容 #!/bin/sh open -na "IntelliJ IDE…

华为OD机试 - 判断字符串子序列(Java 2023 B卷 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述1、输入2、输出3、说明 四、Java算法源码五、效果展示1、输入2、输出 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&am…


目录 一、动态树 ( 1 ) 数据表 ( 2 ) 后端 ( 2 ) 前端 二、书籍管理 数据表 后端 前端 ElementUI的背景 是一套基于Vue.js的桌面端组件库&#xff0c;由饿了么前端团队开发维护。它提供了丰富的UI组件和交互效果&#xff0c;可以帮助开发者快速构建出美观、易用的We…