阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Flume官方文档翻译——Flume 1.7.0 User Guide (unreleased version)

239次阅读
没有评论

共计 50093 个字符,预计需要花费 126 分钟才能阅读完成。

Flume 1.7.0 User Guide

 

  • Introduction(简介)
    • Overview(综述)
    • System Requirements(系统需求)
    • Architecture(架构)
      • Data flow model(数据流模型)
      • Complex flows(复杂流)
      • Reliability(可靠性)
      • Recoverability(可恢复性)
  • Setup(配置)Configuration(配置
    • Setting up an agent(设置一个 agent)
      • Configuring individual components(配置单个组件)
      • Wiring the pieces together(碎片聚集)
      • Starting an agent(开始一个 agent)
      • A simple example(一个简单的例子)
      • Logging raw data(记录原始数据)
      • Zookeeper based Configuration(ZooKeeper 的基础配置)
      • Installing third-party plugins(安装第三方插件)
        • The plugins.d directory(插件目录)
        • Directory layout for plugins(用于插件的目录布局)
    • Data ingestion(数据获取)
      • RPC(远程调用)
      • Executing commands(执行命令)
      • Network streams(网络流)
    • Setting multi-agent flow(设置多个 agent 流)
    • Consolidation(合并)
    • Multiplexing the flow(多路复用流)
    • Defining the flow(定义一个流)
  • Configuration
    • Defining the flow
    • Configuring individual components(配置单个组件)
    • Adding multiple flows in an agent(一个 agent 中增加多个流)
    • Configuring a multi agent flow(配置一个多 agent 流)
    • Fan out flow(扇出流)
    • Flume Sources(各种 Source)
      • Avro Source
      • Thrift Source
      • Exec Source
      • JMS Source
        • Converter(转换器)
      • Spooling Directory Source
        • Event Deserializers
          • LINE
          • AVRO
          • BlobDeserializer
      • Taildir Source
      • Twitter 1% firehose Source (experimental)
      • Kafka Source
      • NetCat Source
      • Sequence Generator Source
      • Syslog Sources
        • Syslog TCP Source
        • Multiport Syslog TCP Source
        • Syslog UDP Source
      • HTTP Source
        • JSONHandler
        • BlobHandler
      • Stress Source
      • Legacy Sources
        • Avro Legacy Source
        • Thrift Legacy Source
      • Custom Source
      • Scribe Source
    • Flume Sinks(各种 Sink)
      • HDFS Sink
      • Hive Sink
      • Logger Sink
      • Avro Sink
      • Thrift Sink
      • IRC Sink
      • File Roll Sink
      • Null Sink
      • HBaseSinks
        • HBaseSink
        • AsyncHBaseSink
      • MorphlineSolrSink
      • ElasticSearchSink
      • Kite Dataset Sink
      • Kafka Sink
      • Custom Sink
    • Flume Channels(各种 Channel)
      • Memory Channel
      • JDBC Channel
      • Kafka Channel
      • File Channel
      • Spillable Memory Channel
      • Pseudo Transaction Channel
      • Custom Channel
    • Flume Channel Selectors(Channel 选择器)
      • Replicating Channel Selector (default)
      • Multiplexing Channel Selector
      • Custom Channel Selector
      • Flume Sink Processors(执行器)
      • Default Sink Processor
      • Failover Sink Processor

      • Load balancing Sink Processor
      • Custom Sink Processor
    • Event Serializers(序列化器)
      • Body Text Serializer
      • “Flume Event”Avro Event Serializer
      • Avro Event Serializer
    • Flume Interceptors(拦截器)
      • Timestamp Interceptor
      • Host Interceptor
      • Static Interceptor
      • UUID Interceptor
      • Morphline Interceptor
      • Search and Replace Interceptor
      • Regex Filtering Interceptor
      • Regex Extractor Interceptor
      • Example 1:
      • Example 2:
    • Flume Properties(属性)
      • Property: flume.called.from.service
  • Log4J Appender(日志存储器)
  • Load Balancing Log4J Appender(负载均衡的日志存储器)
  • Security(安全性)
  • Monitoring(监控)
    • JMX Reporting
    • Ganglia Reporting
    • JSON Reporting
    • Custom Reporting
    • Reporting metrics from custom components
  • Tools(工具)
    • File Channel Integrity Tool
    • Event Validator Tool
  • Topology Design Considerations(拓扑结构设计考虑)
    • Is Flume a good fit for your problem?
    • Flow reliability in Flume
    • Flume topology design
    • Sizing a Flume deployment
  • Troubleshooting(故障排除)

    • Handling agent failures
    • Compatibility
      • HDFS
      • AVRO
      • Additional version requirements
    • Tracing
    • More Sample Configs
  • Component Summary(组件总结)
  • Alias Conventions(别名约定)

 

Introduction

Overview

Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

The use of Apache Flume is not only restricted to log data aggregation. Since data sources are customizable, Flume can be used to transport massive quantities of event data including but not limited to network traffic data, social-media-generated data, email messages and pretty much any data source possible.

Apache Flume is a top level project at the Apache Software Foundation.

There are currently two release code lines available, versions 0.9.x and 1.x.

Documentation for the 0.9.x track is available at the Flume 0.9.x User Guide.

This documentation applies to the 1.4.x track.

New and existing users are encouraged to use the 1.x releases so as to leverage the performance improvements and configuration flexibilities available in the latest architecture.

Apache Flume 是一个分布式、高可靠和高可用的收集、集合和将大量来自不同来源的日志数据移动到一个中央数据仓库。

Apache Flume 不仅局限于数据的聚集。因为数据是可定制的,所以 Flume 可以用于运输大量时间数据包括不限于网络传输数据,社交媒体产生的数据,电子邮件信息和几乎任何数据源。

Apache Flume 是 Apache 软件基金会的顶级项目。

目前有两个可用的发布版本,0.9.x 和 1.x。

我们鼓励新老用户使用 1.x 发布版本来提高性能和利用新结构的配置灵活性。

System Requirements

    1. Java Runtime Environment – Java 1.7 or later(Java 运行环境 -Java1.7 或者以后的版本)
    2. Memory – Sufficient memory for configurations used by sources, channels or sinks(内存——足够的内存来配置 souuces,channels 和 sinks)
    3. Disk Space – Sufficient disk space for configurations used by channels or sinks(磁盘空间 - 足够的磁盘空间来配置 channels 或者 sinks)
    4. Directory Permissions – Read/Write permissions for directories used by agent(目录权限 - 代理所使用的目录读 / 写权限)

Architecture(架构)

Data flow model(数据流动模型)

A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes. A Flume agent is a (JVM) process that hosts the components through which events flow from an external source to the next destination (hop).

一个 Flume event 被定义为拥有一个字节的有效负载的一个数据流单元和一个可选的字符串属性配置。Flume agent 是一个 JVM 进程来控制组件完成事件流从一个外部来源传输到下一个目的地。

Flume 官方文档翻译——Flume 1.7.0 User Guide(unreleased version) 

A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink. A similar flow can be defined using a Thrift Flume Source to receive events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients written in any language generated from the Flume thrift protocol.When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel.

Flume source 消费外部来源像 web server 传输给他的事件。外部来源发送以目标 Flume source 定义好的格式的 event 给 Flume。例如,Avro Flume source 用于接收 Avro 客户端或者流中的其他 Flume 中 Avro sink 发来的 Avro events。一个相似的流可以用 Thrift Flume Source 来接收来自 Flume sink 或者 FluemThrift Rpc 客户端或者一个用任何语言写的遵守 Flume Thrift 协议的 Thrift 客户端的事件。当一个 Flume Source 接收一个事件时,它将事件存储在一个或者多个 Cannel 中。Channel 是一个被动仓库用来保存事件直到它被 Flume Sink 消费掉。File channel 就是个例子 - 它背靠着本地的文件系统。Sink 将事件从 Channel 中移除并且将事件放到一个外部的仓库像 HDFS(通过 Flume HDFS sink)或者向前传输到流中另一个 Flume Agent。Agent 中 Source 和 Sink 异步地执行 Channel 中 events。

Reliability(可靠性)

The events are staged in a channel on each agent. The events are then delivered to the next agent or terminal repository (like HDFS) in the flow. The events are removed from a channel only after they are stored in the channel of next agent or in the terminal repository. This is a how the single-hop message delivery semantics in Flume provide end-to-end reliability of the flow.

Flume uses a transactional approach to guarantee the reliable delivery of the events. The sources and sinks encapsulate in a transaction the storage/retrieval, respectively, of the events placed in or provided by a transaction provided by the channel. This ensures that the set of events are reliably passed from point to point in the flow. In the case of a multi-hop flow, the sink from the previous hop and the source from the next hop both have their transactions running to ensure that the data is safely stored in the channel of the next hop.

事件都是(存储)在每个代理中 Channel。事件会被传送到下一个 Agent 或者流中的最终目的地像 HDFS。事件会在被储存在另一个 Agent 的 Channel 中或者终点仓库之后从原来的 Agent 中移除。这是一个单 hop 在流中信息传输定义,以此提供了端对端的流的可靠性。

Flume 用一个事务性方案来保证事件传递的可靠性。source、sink 和 channel 分别提供不同的事务机制,source 和 sink 是封装事件的存储 / 恢复在一个事务机制中,channel 封装事件的位置和提供在一个事务机制中。这个保证了事件集合可靠地从流中的一个点传到另一个点。在多个 hop 的流中,前一个 hop 的 sink 和后一个 hop 的 source 都有其事务机制来保证数据能够安全得存储在下一个 hop 中。

Setup(设置)

Setting up an agent(设置 Agent)

Flume agent configuration is stored in a local configuration file. This is a text file that follows the Java properties file format. Configurations for one or more agents can be specified in the same configuration file. The configuration file includes properties of each source, sink and channel in an agent and how they are wired together to form data flows.

Flume agent 配置存储在一个本地配置文件中。这是一个跟 Java 属性文件格式一样的文本文件。一个或者多个 agent 可以指定同一个配置文件来进行配置。配置文件包括每个 source 的属性,agent 中的 sink 和 channel 以及它们是如何连接构成数据流。

Wiring the pieces together(碎片集合)

The agent needs to know what individual components to load and how they are connected in order to constitute the flow. This is done by listing the names of each of the sources, sinks and channels in the agent, and then specifying the connecting channel for each sink and source. For example, an agent flows events from an Avro source called avroWeb to HDFS sink hdfs-cluster1 via a file channel called file-channel. The configuration file will contain names of these components and file-channel as a shared channel for both avroWeb source and hdfs-cluster1 sink.

agent 需要知道每个组件加载什么和它们是怎样连接构成流。这通过列出 agent 中每个 source、sink 和 channel 和指定每个 sink 和 source 连接的 channel。例如,一个 agent 流事件从一个称为 avroWeb 的 Avro sources 通过一个称为 file-channel 的文件 channel 流向一个称为 hdfs-cluster1 的 HDFS sink。配置文档将包含这些组件的名字和 avroWeb source 和 hdfs-cluster1 sink 中间共享的 file-channel。

Starting an agent(开始一个 agent)

An agent is started using a shell script called flume-ng which is located in the bin directory of the Flume distribution. You need to specify the agent name, the config directory, and the config file on the command line:

agent 通过一个称为 flume-ngshell 位于 Flume 项目中 bin 目录下的脚本来启动。你必须在命令行中指定一个 agent 名字,配置目录和配置文档

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

Now the agent will start running source and sinks configured in the given properties file.

现在 agent 将会开始运行给定的属性文档中的 cource 和 sink。

A simple example(一个简单的例子)

Here, we give an example configuration file, describing a single-node Flume deployment. This configuration lets a user generate events and subsequently logs them to the console.

这里我们给出一个配置文件的例子,阐述一个单点 Flume 的部署,这个配置让一个用户产生一个事件和随后把事件打印在控制台。

# example.conf: A single-node Flume configuration

 

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# Describe the sink

a1.sinks.k1.type = logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

This configuration defines a single agent named a1. a1 has a source that listens for data on port 44444, a channel that buffers event data in memory, and a sink that logs event data to the console. The configuration file names the various components, then describes their types and configuration parameters. A given configuration file might define several named agents; when a given Flume process is launched a flag is passed telling it which named agent to manifest.

Given this configuration file, we can start Flume as follows:

这个配置信息定义了一个名字为 a1 的单点 agent。a1 拥有一个监听数据端口为 44444 的 source,一个内存 channel 和一个将事件打印在控制台的 sink。配置文档给多个组件命名,并且描述它们的类型和配置参数。一个给定的配置文档可以定义多个 agent;当一个给定的 Flume 进程加载时,一个标志会传递告诉他具体运行哪个 agent。

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

Note that in a full deployment we would typically include one more option: –conf=<conf-dir>. The <conf-dir> directory would include a shell script flume-env.sh and potentially a log4j properties file. In this example, we pass a Java option to force Flume to log to the console and we go without a custom environment script.

需要说明的是在一个完整的部署中我们应该通常会包含多一个选项:–conf=<conf-dir>.<conf-dir> 目录包含一个 shell 脚本 flume-env.sh 和一个潜在的 log4j 属性文档。在这个例子中,我们通过一个 Java 选项来强制 Flume 打印信息到控制台和没有自定义一个环境脚本。

From a separate terminal, we can then telnet port 44444 and send Flume an event:

通过一个独立的终端,我们可以 telnet 端口 4444 和发送一个事件:

$ telnet localhost 44444

Trying 127.0.0.1...

Connected to localhost.localdomain (127.0.0.1).

Escape character is '^]'.

Hello world! <ENTER>

OK

The original Flume terminal will output the event in a log message.

原来的 Flume 终端将会在控制台将事件打印出来:

12/06/19 15:32:19 INFO source.NetcatSource: Source starting

12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

12/06/19 15:32:34 INFO sink.LoggerSink: Event: {headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

Congratulations – you’ve successfully configured and deployed a Flume agent! Subsequent sections cover agent configuration in much more detail.

恭喜 - 你已经成功配置和部署了一个 Flume agent!接下来的部分会覆盖 agent 配置的更多细节。

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-12/138030p2.htm

Logging raw data(记录原始数据)

Logging the raw stream of data flowing through the ingest pipeline is not desired behaviour in many production environments because this may result in leaking sensitive data or security related configurations, such as secret keys, to Flume log files. By default, Flume will not log such information. On the other hand, if the data pipeline is broken, Flume will attempt to provide clues for debugging the problem.

One way to debug problems with event pipelines is to set up an additional Memory Channel connected to a Logger Sink, which will output all event data to the Flume logs. In some situations, however, this approach is insufficient.

In order to enable logging of event- and configuration-related data, some Java system properties must be set in addition to log4j properties.

To enable configuration-related logging, set the Java system property -Dorg.apache.flume.log.printconfig=true. This can either be passed on the command line or by setting this in the JAVA_OPTS variable in flume-env.sh.

To enable data logging, set the Java system property -Dorg.apache.flume.log.rawdata=true in the same way described above. For most components, the log4j logging level must also be set to DEBUG or TRACE to make event-specific logging appear in the Flume logs.

Here is an example of enabling both configuration logging and raw data logging while also setting the Log4j loglevel to DEBUG for console output:

通过摄取管道获取记录到Flume log 文件的原始数据流大多不会描述生产环境的行为因为数据里面可能包含敏感数据或者安全相关的配置,例如安全密钥。默认情况,Flume 不会记录这些信息。另一方面,如果数据管道损坏,Flume 会试图提供一些线索来调试问题。

一个调试事件管道的方法是设置一个额外的内存channel 来连接 Logger Sink,用来将所有事件数据都记录到 Flume log。然而,在一些情况之下,这种方法还是不足以解决问题。

为了能够记录配置相关的日志,设置-Dorg.apache.flume.log.printconfig=true。这个也可以通过命令行或者在 flume-env.sh 设置 JAVA_OPTS 属性。

为了能够记录数据,通过跟上面相同的方式来设置 -Dorg.apache.flume.log.rawdata=true 。对于大部分组件来说,log4j 的打印级别必须设置为 DEBUG 或者 TRACE 来让指定事件的日志信息出现在 Flume log 中。

下面是一个例子能够保证将配置信息和原始数据在log4j 的打印级别设置在 DEBUG 的情况下输出到控制台:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console 
-Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true                                                                        

Zookeeper based ConfigurationZookeeper 基础配置)

Flume supports Agent configurations via Zookeeper. This is an experimental feature. The configuration file needs to be uploaded in the Zookeeper, under a configurable prefix. The configuration file is stored in Zookeeper Node data. Following is how the Zookeeper Node tree would look like for agents a1 and a2

Flume 支持通过 ZooKeeper 来配置 Agent。这是个实验性的特性。配置文档必须上传到 ZooKeeper 中,(在一个可配置的前缀下)。这个配置文档存储在 ZooKeeper 节点数据下。下面是 ZooKeeper 下的节点树结构:

- /flume

 |- /a1 [Agent config file]

 |- /a2 [Agent config file]

Once the configuration file is uploaded, start the agent with following options

一旦配置文档上传完成,通过下面选项来启动agent

$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console

Argument Name

Default

Description

z

Zookeeper connection string. Comma separated list of hostname:port

p

/flume

Base Path in Zookeeper to store Agent configurations

Installing third-party plugins(安装第三方插件)

Flume has a fully plugin-based architecture. While Flume ships with many out-of-the-box sources, channels, sinks, serializers, and the like, many implementations exist which ship separately from Flume.

While it has always been possible to include custom Flume components by adding their jars to the FLUME_CLASSPATH variable in the flume-env.sh file, Flume now supports a special directory called plugins.d which automatically picks up plugins that are packaged in a specific format. This allows for easier management of plugin packaging issues as well as simpler debugging and troubleshooting of several classes of issues, especially library dependency conflicts.

Flume 拥有一个全备的插件架构。虽然 Flume 自带许多开箱即用的 sources、channel,sinks 和 serializers 等,同时也存在许多跟 Flume 之外的实现。

Flume 曾经支持在 flume-env.sh 中的 FLUME_CLASSPATH 中添加一些自定义的 Flume 组件,现在 Flume 支持一个特殊路径 plugins.d 自动地安装那些按照指定格式存储的插件。

The plugins.d directory(插件目录)

The plugins.d directory is located at $FLUME_HOME/plugins.d. At startup time, the flume-ng start script looks in the plugins.d directory for plugins that conform to the below format and includes them in proper paths when starting up java.

这个plugins.d 目录位于 $FLUME_HOME/plugins.d。在启动的时候,flume-ng 启动脚本扫描 plugins.d 目录下的遵循格式的插件并在启动 java 时将它们放在合适的路径。

Directory layout for plugins(插件的底层目录)

Each plugin (subdirectory) within plugins.d can have up to three sub-directories:

    1. lib – the plugin’s jar(s)
    2. libext – the plugin’s dependency jar(s)
    3. native – any required native libraries, such as .so files

Example of two plugins within the plugins.d directory:

plugins.d 下的每个插件(子目录)包含三个子目录:

    1. lib – 插件的jar 包
    2. libext – 插件的依赖jar 包
    3. native – 任何需要的本地库,例如.so 文档。

plugins.d 目录下的两个插件:

plugins.d/

plugins.d/custom-source-1/

plugins.d/custom-source-1/lib/my-source.jar

plugins.d/custom-source-1/libext/spring-core-2.5.6.jar

plugins.d/custom-source-2/

plugins.d/custom-source-2/lib/custom.jar

plugins.d/custom-source-2/native/gettext.so

Data ingestion(数据获取)

Flume supports a number of mechanisms to ingest data from external sources.

Flume 支持从外部来源获取数据的一系列机制。

RPC

An Avro client included in the Flume distribution can send a given file to Flume Avro source using avro RPC mechanism:

Flume 中的 Avro client 可以用 avro RPC 机制来发送一个给定文档给 Flume Avro source。

$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10

The above command will send the contents of /usr/logs/log.10 to to the Flume source listening on that ports.

上面的命令行发送/usr/logs/log.10 的内容给监听在那个端口的 Flume source。

Executing commands(执行命令行)

There’s an exec source that executes a given command and consumes the output. A single‘line’of output ie. text followed by carriage return (‘\r’) or line feed (‘\n’) or both together.

有一个执行 source 来执行给出的命令和消费输出。输出的一行文本带着(‘\r’) 或者 (‘\n’) 或者两者皆有。

Note : Flume does not support tail as a source. One can wrap the tail command in an exec source to stream the file.

说明:Fluem 不支持一个结尾符作为一个资源。所以可以用一个可执行的源码来包装结尾命令输出文件。)

Network streams

Flume supports the following mechanisms to read data from popular log stream types, such as:

Flume 支持下面的机制来读取受欢迎的日志类型,例如:

    1. Avro
    2. Thrift
    3. Syslog
    4. Netcat

Setting multi-agent flow(设置多个agent 流)

Flume 官方文档翻译——Flume 1.7.0 User Guide(unreleased version) 

In order to flow the data across multiple agents or hops, the sink of the previous agent and source of the current hop need to be avro type with the sink pointing to the hostname (or IP address) and port of the source.

为了让数据可以流过多个agents 或者 hops,前面那个 agent 的 sink 和当前的 hop 的 source 都必须是 avro 类型并且 sink 还要指向 source 的主机名(IP 地址)和端口。

Consolidation(结合)

A very common scenario in log collection is a large number of log producing clients sending data to a few consumer agents that are attached to the storage subsystem. For example, logs collected from hundreds of web servers sent to a dozen of agents that write to HDFS cluster.

一个日志收集中非常常见的情形是大量日志发送到一些消费数据的绑定到子存储系统的agent 上。例如,从上百个 web 服务器日志收集而来日志发送到一打 agents 写到 HDFS 集群

Flume 官方文档翻译——Flume 1.7.0 User Guide(unreleased version) 

This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent (Again you could use the thrift sources/sinks/clients in such a scenario). This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination.

这个可以在Flume 中配置第一层包含 avro sink 的 agents,所有的 sink 都执行一个单独的拥有 avro source 的 agent(你也可以在这样的情形下使用 thrift sources/sinks/cleints)。这个在第二层 agent 中的 source 将接收到的数据存储在一个 channel 中用来给 sink 输入到最终的目的。

Multiplexing the flow(选择分流)

Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels.

Flume 支持将事件流向一个或者多个目的地。这个可以通过定义一个流的能够复制或者可选路径的多路选择器来将事件导向一个或者多个 Channel 来实现。

Flume 官方文档翻译——Flume 1.7.0 User Guide(unreleased version) 

The above example shows a source from agent“foo”fanning out the flow to three different channels. This fan out can be replicating or multiplexing. In case of replicating flow, each event is sent to all three channels. For the multiplexing case, an event is delivered to a subset of available channels when an event’s attribute matches a preconfigured value. For example, if an event attribute called“txnType”is set to“customer”, then it should go to channel1 and channel3, if it’s“vendor”then it should go to channel2, otherwise channel3. The mapping can be set in the agent’s configuration file.

上面那个例子展示“foo”agent 中的 source 将事件流分到三个不同的 Channel。这个分流可以是复制或者多路复用。在复制流的情况下,每个实践都会被发送到三个 channel 中。对于分流的情况,一个事件将会匹配与配置好的 value 来发送到可达的 channel 中。例如,假如一个事件的属性“txnType”设为“customer”,那么它将被发送到 channel1 和 channel3,如果值为“vendor”,那么会被送到 channel2,否则就去 channel3。这个映射关系可以在 agent 的配置文档中设置。

Configuration(配置)

As mentioned in the earlier section, Flume agent configuration is read from a file that resembles a Java property file format with hierarchical property settings.

正如在前面部分所提到的,Flume agent 配置是从一个类似于 Java 属性文件格式和层级属性设置的文档中读取的。

Defining the flow(定义流)

To define the flow within a single agent, you need to link the sources and sinks via a channel. You need to list the sources, sinks and channels for the given agent, and then point the source and sink to a channel. A source instance can specify multiple channels, but a sink instance can only specify one channel. The format is as follows:

在一个单点agent 中定义流。你必须通过一个 channel 来连接 source 和 sink。你必须列出给定的 agent 的 sources,sinks 和 channel,然后指出 source 和 sink 所指定的 channel。一个 source 实例可以指定多个 channel,但是一个 sink 实例只能指定一个 channel。格式如下:

# list the sources, sinks and channels for the agent

<Agent>.sources = <Source>

<Agent>.sinks = <Sink>

<Agent>.channels = <Channel1> <Channel2>

 

# set channel for source

<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

 

# set channel for sink

<Agent>.sinks.<Sink>.channel = <Channel1>

For example, an agent named agent_foo is reading data from an external avro client and sending it to HDFS via a memory channel. The config file weblog.config could look like:

例如,一个 agent 命名为 agent_foo 从一个外部的 avro 客户端读取数据通过一个内存 channel 发送到 HDFS。配置文件可以如下:# list the sources, sinks and channels for the agent

agent_foo.sources = avro-appserver-src-1

agent_foo.sinks = hdfs-sink-1

agent_foo.channels = mem-channel-1

 

# set channel for source

agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

 

# set channel for sink

agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

This will make the events flow from avro-AppSrv-source to hdfs-Cluster1-sink through the memory channel mem-channel-1. When the agent is started with the weblog.config as its config file, it will instantiate that flow.

这样就可以使得事件流从avro-AppSrv-source 通过内存 channel mem-channel- 1 流向 hdfs-Cluster1-sink。当 agent 将 weblog.config 作为他的配置文件启动时,就会实例化这样一个流。

Configuring individual components(配置单个组件)

After defining the flow, you need to set properties of each source, sink and channel. This is done in the same hierarchical namespace fashion where you set the component type and other values for the properties specific to each component:

定义好一个流之后,你必须为每个source、sink 和 channel 配置属性。这跟你为每个组件设置组件类型和其他属性时使用的命名空间层级格式是一样的。

# properties for sources

<Agent>.sources.<Source>.<someProperty> = <someValue>

 

# properties for channels

<Agent>.channel.<Channel>.<someProperty> = <someValue>

 

# properties for sinks

<Agent>.sources.<Sink>.<someProperty> = <someValue>

The property“type”needs to be set for each component for Flume to understand what kind of object it needs to be. Each source, sink and channel type has its own set of properties required for it to function as intended. All those need to be set as needed. In the previous example, we have a flow from avro-AppSrv-source to hdfs-Cluster1-sink through the memory channel mem-channel-1. Here’s an example that shows configuration of each of those components:

每个组件的“type”属性是必须设置的,以保证 Flume 框架能够知道他们是哪种类型的。每个 source、sink 和 channel 类型都有它们被设计的预期功能而自己独有的属性。所有这些都必须设置。在前面的例子当中。我们拥有一个 avro-AppSrv-source 通过内存 channel mem-channel- 1 连接 hdfs-Cluster1-sink 的流。下面将展示这些组件的配置情况

agent_foo.sources = avro-AppSrv-source

agent_foo.sinks = hdfs-Cluster1-sink

agent_foo.channels = mem-channel-1

 

# set channel for sources, sinks

 

# properties of avro-AppSrv-source

agent_foo.sources.avro-AppSrv-source.type = avro

agent_foo.sources.avro-AppSrv-source.bind = localhost

agent_foo.sources.avro-AppSrv-source.port = 10000

 

# properties of mem-channel-1

agent_foo.channels.mem-channel-1.type = memory

agent_foo.channels.mem-channel-1.capacity = 1000

agent_foo.channels.mem-channel-1.transactionCapacity = 100

 

# properties of hdfs-Cluster1-sink

agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs

agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

 

#...

Adding multiple flows in an agent(一个Agent 多个流)

A single Flume agent can contain several independent flows. You can list multiple sources, sinks and channels in a config. These components can be linked to form multiple flows:

单个Flume agent 可以包含多个独立的流。你可以在一个配置文件中列出多个 sources、sinks 和 channels。这些组件将连接组成多个流。

# list the sources, sinks and channels for the agent

<Agent>.sources = <Source1> <Source2>

<Agent>.sinks = <Sink1> <Sink2>

<Agent>.channels = <Channel1> <Channel2>

Then you can link the sources and sinks to their corresponding channels (for sources) of channel (for sinks) to setup two different flows. For example, if you need to setup two flows in an agent, one going from an external avro client to external HDFS and another from output of a tail to avro sink, then here’s a config to do that:

然后你可以将sources 和 sink 是通过相应的 channels 连接来配置两个不同的流。例如,你必须在一个 agent 中配置两个流,一个是从外部 avro 客户端到外部 HDFS 和另一个是从一个 avro sink 获取数据,以下配置可达到这个目标:

# list the sources, sinks and channels in the agent

agent_foo.sources = avro-AppSrv-source1 exec-tail-source2

agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2

agent_foo.channels = mem-channel-1 file-channel-2

 

# flow #1 configuration

agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1

agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

 

# flow #2 configuration

agent_foo.sources.exec-tail-source2.channels = file-channel-2

agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

Configuring a multi agent flow(配置一个多agent 流)

To setup a multi-tier flow, you need to have an avro/thrift sink of first hop pointing to avro/thrift source of the next hop. This will result in the first Flume agent forwarding events to the next Flume agent. For example, if you are periodically sending files (1 file per event) using avro client to a local Flume agent, then this local agent can forward it to another agent that has the mounted for storage.

为了配置一个多层的流,你必须要有一个avro/thriftsink 指向下一个 hop 的 avro/thrift source。这将会使得第一个 Flume agent 将 events 传给下一个 Flume agent。例如,如果你用 avro client 周期性地向一个本地的 Flume agent 发送数据,这个本地的 Flume agent 将 events 传到另外一个挂载内存的 agent。

Weblog agent config:

# list sources, sinks and channels in the agent

agent_foo.sources = avro-AppSrv-source

agent_foo.sinks = avro-forward-sink

agent_foo.channels = file-channel

 

# define the flow

agent_foo.sources.avro-AppSrv-source.channels = file-channel

agent_foo.sinks.avro-forward-sink.channel = file-channel

 

# avro sink properties

agent_foo.sources.avro-forward-sink.type = avro

agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100

agent_foo.sources.avro-forward-sink.port = 10000

 

# configure other pieces

#...

HDFS agent config:

# list sources, sinks and channels in the agent

agent_foo.sources = avro-collection-source

agent_foo.sinks = hdfs-sink

agent_foo.channels = mem-channel

 

# define the flow

agent_foo.sources.avro-collection-source.channels = mem-channel

agent_foo.sinks.hdfs-sink.channel = mem-channel

 

# avro sink properties

agent_foo.sources.avro-collection-source.type = avro

agent_foo.sources.avro-collection-source.bind = 10.1.1.100

agent_foo.sources.avro-collection-source.port = 10000

 

# configure other pieces

#...

Here we link the avro-forward-sink from the weblog agent to the avro-collection-source of the hdfs agent. This will result in the events coming from the external appserver source eventually getting stored in HDFS.

在这里��我们将weblog agent 的 avro-forward-sink 连到 hdfs agent 的 avro-collection-source。这将使得从外部 app 服务器来的 events 最终储存到 HDFS 中。

Fan out flow(分流)

As discussed in previous section, Flume supports fanning out the flow from one source to multiple channels. There are two modes of fan out, replicating and multiplexing. In the replicating flow, the event is sent to all the configured channels. In case of multiplexing, the event is sent to only a subset of qualifying channels. To fan out the flow, one needs to specify a list of channels for a source and the policy for the fanning it out. This is done by adding a channel“selector”that can be replicating or multiplexing. Then further specify the selection rules if it’s a multiplexer. If you don’t specify a selector, then by default it’s replicating:

正如前面部分所讨论的,Flume 支持将来自一个 source 的 events 分到多个 channels。将有两个模式的分流(暂且叫分流吧),复制流和选择流。在复制流中,所有的 events 将会发送到所有的 channel 中。在选择流中,event 会被分到特定的 channel 中。在分流中,必须为 source 指定一组 channel 和相应的策略。通过给 source 增加一个 selector.type 的属性来选择复制还是选择。如果是选择流, 那么就要指定选择规则。如果没有指定的话,默认就是复制流。

# List the sources, sinks and channels for the agent

<Agent>.sources = <Source1>

<Agent>.sinks = <Sink1> <Sink2>

<Agent>.channels = <Channel1> <Channel2>

 

# set list of channels for source (separated by space)

<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

 

# set channel for sinks

<Agent>.sinks.<Sink1>.channel = <Channel1>

<Agent>.sinks.<Sink2>.channel = <Channel2>

 

<Agent>.sources.<Source1>.selector.type = replicating

The multiplexing select has a further set of properties to bifurcate the flow. This requires specifying a mapping of an event attribute to a set for channel. The selector checks for each configured attribute in the event header. If it matches the specified value, then that event is sent to all the channels mapped to that value. If there’s no match, then the event is sent to set of channels configured as default:

选择流拥有一组属性来进行分流。这个需要为事件属性和channel 指定一个映射关系。这个选择器检查每个事件的 header 中的配置属性。如果他匹配到指定的值,该事件将会发送到所有跟指定值存在映射关系的 channel。如果没有匹配成功,该 event 会发送到默认的 channel。

# Mapping for multiplexing selector

<Agent>.sources.<Source1>.selector.type = multiplexing

<Agent>.sources.<Source1>.selector.header = <someHeader>

<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>

<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>

<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>

#...

 

<Agent>.sources.<Source1>.selector.default = <Channel2>

The mapping allows overlapping the channels for each value.

该映射允许一个channel 对应多个值。

The following example has a single flow that multiplexed to two paths. The agent named agent_foo has a single avro source and two channels linked to two sinks:

接下来的例子是一个拥有两条路径的选择流。名字为agent_foo 的 agent 拥有单个 avro source 和两个 channel 连接两个 sinks。

# list the sources, sinks and channels in the agent

agent_foo.sources = avro-AppSrv-source1

agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2

agent_foo.channels = mem-channel-1 file-channel-2

 

# set channels for source

agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

 

# set channel for sinks

agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

 

# channel selector configuration

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing

agent_foo.sources.avro-AppSrv-source1.selector.header = State

agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1

agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2

agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2

agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector checks for a header called“State”. If the value is“CA”then its sent to mem-channel-1, if its“AZ”then it goes to file-channel-2 or if its“NY”then both. If the“State”header is not set or doesn’t match any of the three, then it goes to mem-channel-1 which is designated as‘default’.

选择器检查名为“State”的 header。如果值为“CA”会被送到 mem-channel-1,如果值为“AZ”将会被送 file-channel- 2 或者值为“NY”那么就会被送到两个 channel。如果“State”header 没有找到匹配的 channel,那么它会被送到默认 channel mem-channel-1

The selector also supports optional channels. To specify optional channels for a header, the config parameter‘optional’is used in the following way:

选择器也支持可选channels。可以为一个 header 指定可选 channel,可按以下方式来使用“optional”配置参数:

# channel selector configuration

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing

agent_foo.sources.avro-AppSrv-source1.selector.header = State

agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1

agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2

agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2

agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2

agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2

agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector will attempt to write to the required channels first and will fail the transaction if even one of these channels fails to consume the events. The transaction is reattempted on all of the channels. Once all required channels have consumed the events, then the selector will attempt to write to the optional channels. A failure by any of the optional channels to consume the event is simply ignored and not retried.

If there is an overlap between the optional channels and required channels for a specific header, the channel is considered to be required, and a failure in the channel will cause the entire set of required channels to be retried. For instance, in the above example, for the header“CA”mem-channel-1 is considered to be a required channel even though it is marked both as required and optional, and a failure to write to this channel will cause that event to be retried on all channels configured for the selector.

Note that if a header does not have any required channels, then the event will be written to the default channels and will be attempted to be written to the optional channels for that header. Specifying optional channels will still cause the event to be written to the default channels, if no required channels are specified. If no channels are designated as default and there are no required, the selector will attempt to write the events to the optional channels. Any failures are simply ignored in that case.

选择器会试图第一时间将数据写到需求channel 和当这些 channel 中某些 channel 没法消费这些 events 时会停止这次事务。该事务会重新连接所有 channel。一旦所有 channel 都在消费了所有 events,那么选择器会试图将 events 写到备选 channel 中。备选 channel 消费 event 产生的失效会被简单地忽略和不再重试。

如果对于一个指定的header 存在备选 channel 和需求 channel 的重叠,那么选择需求 channel,并且当一个需求 channel 发生失效时将会引起所有需求 channel 的重试。举个例子,在上面的案例中,为 header“CA”指定了一个需求 channel mem-channel-1,尽管备选 channel 和需求 channel 都指定了,但是一旦需求 channel 发生失效,name 会引起该选择器中所有 channel 的重试。

需要说明的是如果一个header 没有指定任何需求 channel,那么 events 会写到默认 channel 和试图写到备选 channel 中。如果没有指定需求 channel,就算指定了备选 channel,events 还是会被写到默认 channel 中。如果没有指定需求 channel 和默认 channel,选择器会说将 events 写到备选 channel。在这些情况中,失效会被忽略。

Flume 1.7.0 User Guide

 

  • Introduction(简介)
    • Overview(综述)
    • System Requirements(系统需求)
    • Architecture(架构)
      • Data flow model(数据流模型)
      • Complex flows(复杂流)
      • Reliability(可靠性)
      • Recoverability(可恢复性)
  • Setup(配置)Configuration(配置
    • Setting up an agent(设置一个 agent)
      • Configuring individual components(配置单个组件)
      • Wiring the pieces together(碎片聚集)
      • Starting an agent(开始一个 agent)
      • A simple example(一个简单的例子)
      • Logging raw data(记录原始数据)
      • Zookeeper based Configuration(ZooKeeper 的基础配置)
      • Installing third-party plugins(安装第三方插件)
        • The plugins.d directory(插件目录)
        • Directory layout for plugins(用于插件的目录布局)
    • Data ingestion(数据获取)
      • RPC(远程调用)
      • Executing commands(执行命令)
      • Network streams(网络流)
    • Setting multi-agent flow(设置多个 agent 流)
    • Consolidation(合并)
    • Multiplexing the flow(多路复用流)
    • Defining the flow(定义一个流)
  • Configuration
    • Defining the flow
    • Configuring individual components(配置单个组件)
    • Adding multiple flows in an agent(一个 agent 中增加多个流)
    • Configuring a multi agent flow(配置一个多 agent 流)
    • Fan out flow(扇出流)
    • Flume Sources(各种 Source)
      • Avro Source
      • Thrift Source
      • Exec Source
      • JMS Source
        • Converter(转换器)
      • Spooling Directory Source
        • Event Deserializers
          • LINE
          • AVRO
          • BlobDeserializer
      • Taildir Source
      • Twitter 1% firehose Source (experimental)
      • Kafka Source
      • NetCat Source
      • Sequence Generator Source
      • Syslog Sources
        • Syslog TCP Source
        • Multiport Syslog TCP Source
        • Syslog UDP Source
      • HTTP Source
        • JSONHandler
        • BlobHandler
      • Stress Source
      • Legacy Sources
        • Avro Legacy Source
        • Thrift Legacy Source
      • Custom Source
      • Scribe Source
    • Flume Sinks(各种 Sink)
      • HDFS Sink
      • Hive Sink
      • Logger Sink
      • Avro Sink
      • Thrift Sink
      • IRC Sink
      • File Roll Sink
      • Null Sink
      • HBaseSinks
        • HBaseSink
        • AsyncHBaseSink
      • MorphlineSolrSink
      • ElasticSearchSink
      • Kite Dataset Sink
      • Kafka Sink
      • Custom Sink
    • Flume Channels(各种 Channel)
      • Memory Channel
      • JDBC Channel
      • Kafka Channel
      • File Channel
      • Spillable Memory Channel
      • Pseudo Transaction Channel
      • Custom Channel
    • Flume Channel Selectors(Channel 选择器)
      • Replicating Channel Selector (default)
      • Multiplexing Channel Selector
      • Custom Channel Selector
      • Flume Sink Processors(执行器)
      • Default Sink Processor
      • Failover Sink Processor

      • Load balancing Sink Processor
      • Custom Sink Processor
    • Event Serializers(序列化器)
      • Body Text Serializer
      • “Flume Event”Avro Event Serializer
      • Avro Event Serializer
    • Flume Interceptors(拦截器)
      • Timestamp Interceptor
      • Host Interceptor
      • Static Interceptor
      • UUID Interceptor
      • Morphline Interceptor
      • Search and Replace Interceptor
      • Regex Filtering Interceptor
      • Regex Extractor Interceptor
      • Example 1:
      • Example 2:
    • Flume Properties(属性)
      • Property: flume.called.from.service
  • Log4J Appender(日志存储器)
  • Load Balancing Log4J Appender(负载均衡的日志存储器)
  • Security(安全性)
  • Monitoring(监控)
    • JMX Reporting
    • Ganglia Reporting
    • JSON Reporting
    • Custom Reporting
    • Reporting metrics from custom components
  • Tools(工具)
    • File Channel Integrity Tool
    • Event Validator Tool
  • Topology Design Considerations(拓扑结构设计考虑)
    • Is Flume a good fit for your problem?
    • Flow reliability in Flume
    • Flume topology design
    • Sizing a Flume deployment
  • Troubleshooting(故障排除)

    • Handling agent failures
    • Compatibility
      • HDFS
      • AVRO
      • Additional version requirements
    • Tracing
    • More Sample Configs
  • Component Summary(组件总结)
  • Alias Conventions(别名约定)

 

Introduction

Overview

Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

The use of Apache Flume is not only restricted to log data aggregation. Since data sources are customizable, Flume can be used to transport massive quantities of event data including but not limited to network traffic data, social-media-generated data, email messages and pretty much any data source possible.

Apache Flume is a top level project at the Apache Software Foundation.

There are currently two release code lines available, versions 0.9.x and 1.x.

Documentation for the 0.9.x track is available at the Flume 0.9.x User Guide.

This documentation applies to the 1.4.x track.

New and existing users are encouraged to use the 1.x releases so as to leverage the performance improvements and configuration flexibilities available in the latest architecture.

Apache Flume 是一个分布式、高可靠和高可用的收集、集合和将大量来自不同来源的日志数据移动到一个中央数据仓库。

Apache Flume 不仅局限于数据的聚集。因为数据是可定制的,所以 Flume 可以用于运输大量时间数据包括不限于网络传输数据,社交媒体产生的数据,电子邮件信息和几乎任何数据源。

Apache Flume 是 Apache 软件基金会的顶级项目。

目前有两个可用的发布版本,0.9.x 和 1.x。

我们鼓励新老用户使用 1.x 发布版本来提高性能和利用新结构的配置灵活性。

System Requirements

    1. Java Runtime Environment – Java 1.7 or later(Java 运行环境 -Java1.7 或者以后的版本)
    2. Memory – Sufficient memory for configurations used by sources, channels or sinks(内存——足够的内存来配置 souuces,channels 和 sinks)
    3. Disk Space – Sufficient disk space for configurations used by channels or sinks(磁盘空间 - 足够的磁盘空间来配置 channels 或者 sinks)
    4. Directory Permissions – Read/Write permissions for directories used by agent(目录权限 - 代理所使用的目录读 / 写权限)

Architecture(架构)

Data flow model(数据流动模型)

A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes. A Flume agent is a (JVM) process that hosts the components through which events flow from an external source to the next destination (hop).

一个 Flume event 被定义为拥有一个字节的有效负载的一个数据流单元和一个可选的字符串属性配置。Flume agent 是一个 JVM 进程来控制组件完成事件流从一个外部来源传输到下一个目的地。

Flume 官方文档翻译——Flume 1.7.0 User Guide(unreleased version) 

A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink. A similar flow can be defined using a Thrift Flume Source to receive events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients written in any language generated from the Flume thrift protocol.When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel.

Flume source 消费外部来源像 web server 传输给他的事件。外部来源发送以目标 Flume source 定义好的格式的 event 给 Flume。例如,Avro Flume source 用于接收 Avro 客户端或者流中的其他 Flume 中 Avro sink 发来的 Avro events。一个相似的流可以用 Thrift Flume Source 来接收来自 Flume sink 或者 FluemThrift Rpc 客户端或者一个用任何语言写的遵守 Flume Thrift 协议的 Thrift 客户端的事件。当一个 Flume Source 接收一个事件时,它将事件存储在一个或者多个 Cannel 中。Channel 是一个被动仓库用来保存事件直到它被 Flume Sink 消费掉。File channel 就是个例子 - 它背靠着本地的文件系统。Sink 将事件从 Channel 中移除并且将事件放到一个外部的仓库像 HDFS(通过 Flume HDFS sink)或者向前传输到流中另一个 Flume Agent。Agent 中 Source 和 Sink 异步地执行 Channel 中 events。

Reliability(可靠性)

The events are staged in a channel on each agent. The events are then delivered to the next agent or terminal repository (like HDFS) in the flow. The events are removed from a channel only after they are stored in the channel of next agent or in the terminal repository. This is a how the single-hop message delivery semantics in Flume provide end-to-end reliability of the flow.

Flume uses a transactional approach to guarantee the reliable delivery of the events. The sources and sinks encapsulate in a transaction the storage/retrieval, respectively, of the events placed in or provided by a transaction provided by the channel. This ensures that the set of events are reliably passed from point to point in the flow. In the case of a multi-hop flow, the sink from the previous hop and the source from the next hop both have their transactions running to ensure that the data is safely stored in the channel of the next hop.

事件都是(存储)在每个代理中 Channel。事件会被传送到下一个 Agent 或者流中的最终目的地像 HDFS。事件会在被储存在另一个 Agent 的 Channel 中或者终点仓库之后从原来的 Agent 中移除。这是一个单 hop 在流中信息传输定义,以此提供了端对端的流的可靠性。

Flume 用一个事务性方案来保证事件传递的可靠性。source、sink 和 channel 分别提供不同的事务机制,source 和 sink 是封装事件的存储 / 恢复在一个事务机制中,channel 封装事件的位置和提供在一个事务机制中。这个保证了事件集合可靠地从流中的一个点传到另一个点。在多个 hop 的流中,前一个 hop 的 sink 和后一个 hop 的 source 都有其事务机制来保证数据能够安全得存储在下一个 hop 中。

Setup(设置)

Setting up an agent(设置 Agent)

Flume agent configuration is stored in a local configuration file. This is a text file that follows the Java properties file format. Configurations for one or more agents can be specified in the same configuration file. The configuration file includes properties of each source, sink and channel in an agent and how they are wired together to form data flows.

Flume agent 配置存储在一个本地配置文件中。这是一个跟 Java 属性文件格式一样的文本文件。一个或者多个 agent 可以指定同一个配置文件来进行配置。配置文件包括每个 source 的属性,agent 中的 sink 和 channel 以及它们是如何连接构成数据流。

Wiring the pieces together(碎片集合)

The agent needs to know what individual components to load and how they are connected in order to constitute the flow. This is done by listing the names of each of the sources, sinks and channels in the agent, and then specifying the connecting channel for each sink and source. For example, an agent flows events from an Avro source called avroWeb to HDFS sink hdfs-cluster1 via a file channel called file-channel. The configuration file will contain names of these components and file-channel as a shared channel for both avroWeb source and hdfs-cluster1 sink.

agent 需要知道每个组件加载什么和它们是怎样连接构成流。这通过列出 agent 中每个 source、sink 和 channel 和指定每个 sink 和 source 连接的 channel。例如,一个 agent 流事件从一个称为 avroWeb 的 Avro sources 通过一个称为 file-channel 的文件 channel 流向一个称为 hdfs-cluster1 的 HDFS sink。配置文档将包含这些组件的名字和 avroWeb source 和 hdfs-cluster1 sink 中间共享的 file-channel。

Starting an agent(开始一个 agent)

An agent is started using a shell script called flume-ng which is located in the bin directory of the Flume distribution. You need to specify the agent name, the config directory, and the config file on the command line:

agent 通过一个称为 flume-ngshell 位于 Flume 项目中 bin 目录下的脚本来启动。你必须在命令行中指定一个 agent 名字,配置目录和配置文档

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

Now the agent will start running source and sinks configured in the given properties file.

现在 agent 将会开始运行给定的属性文档中的 cource 和 sink。

A simple example(一个简单的例子)

Here, we give an example configuration file, describing a single-node Flume deployment. This configuration lets a user generate events and subsequently logs them to the console.

这里我们给出一个配置文件的例子,阐述一个单点 Flume 的部署,这个配置让一个用户产生一个事件和随后把事件打印在控制台。

# example.conf: A single-node Flume configuration

 

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# Describe the sink

a1.sinks.k1.type = logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

This configuration defines a single agent named a1. a1 has a source that listens for data on port 44444, a channel that buffers event data in memory, and a sink that logs event data to the console. The configuration file names the various components, then describes their types and configuration parameters. A given configuration file might define several named agents; when a given Flume process is launched a flag is passed telling it which named agent to manifest.

Given this configuration file, we can start Flume as follows:

这个配置信息定义了一个名字为 a1 的单点 agent。a1 拥有一个监听数据端口为 44444 的 source,一个内存 channel 和一个将事件打印在控制台的 sink。配置文档给多个组件命名,并且描述它们的类型和配置参数。一个给定的配置文档可以定义多个 agent;当一个给定的 Flume 进程加载时,一个标志会传递告诉他具体运行哪个 agent。

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

Note that in a full deployment we would typically include one more option: –conf=<conf-dir>. The <conf-dir> directory would include a shell script flume-env.sh and potentially a log4j properties file. In this example, we pass a Java option to force Flume to log to the console and we go without a custom environment script.

需要说明的是在一个完整的部署中我们应该通常会包含多一个选项:–conf=<conf-dir>.<conf-dir> 目录包含一个 shell 脚本 flume-env.sh 和一个潜在的 log4j 属性文档。在这个例子中,我们通过一个 Java 选项来强制 Flume 打印信息到控制台和没有自定义一个环境脚本。

From a separate terminal, we can then telnet port 44444 and send Flume an event:

通过一个独立的终端,我们可以 telnet 端口 4444 和发送一个事件:

$ telnet localhost 44444

Trying 127.0.0.1...

Connected to localhost.localdomain (127.0.0.1).

Escape character is '^]'.

Hello world! <ENTER>

OK

The original Flume terminal will output the event in a log message.

原来的 Flume 终端将会在控制台将事件打印出来:

12/06/19 15:32:19 INFO source.NetcatSource: Source starting

12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

12/06/19 15:32:34 INFO sink.LoggerSink: Event: {headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

Congratulations – you’ve successfully configured and deployed a Flume agent! Subsequent sections cover agent configuration in much more detail.

恭喜 - 你已经成功配置和部署了一个 Flume agent!接下来的部分会覆盖 agent 配置的更多细节。

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-12/138030p2.htm

Flume Properties

Property Name          

Default 

Description

flume.called.from.service                                                                                                                                    

–                                                      

If this property is specified then the Flume agent will continue polling for the config file even if the config file is not found at the expected location. Otherwise, the Flume agent will terminate if the config doesn’t exist at the expected location. No property value is needed when setting this property (eg, just specifying -Dflume.called.from.service is enough)

如果这个属性被指定了,那么 Flume agent 会轮询配置文档即使在指定路径找不到该文档。此外,FLume agent 将会结束如果配置文档不在指定位置上。这个属性不需要设置值(例如,只是指定 -Dflume.called.from.service 就足够了)

Property: flume.called.from.service

Flume periodically polls, every 30 seconds, for changes to the specified config file. A Flume agent loads a new configuration from the config file if either an existing file is polled for the first time, or if an existing file’s modification date has changed since the last time it was polled. Renaming or moving a file does not change its modification time. When a Flume agent polls a non-existent file then one of two things happens: 1. When the agent polls a non-existent config file for the first time, then the agent behaves according to the flume.called.from.service property. If the property is set, then the agent will continue polling (always at the same period – every 30 seconds). If the property is not set, then the agent immediately terminates. …OR… 2. When the agent polls a non-existent config file and this is not the first time the file is polled, then the agent makes no config changes for this polling period. The agent continues polling rather than terminating.

Flume 每 30 秒周期轮询配置文档是否改变。如果一个文档是第一次被轮询到或者在上次轮询后修改时间被改变了,那么 Flume agent 会加载新的配置文档。重命名和移动一个文档不会改变文档的修改时间。当一个 Flume agent 轮询一个不存在的文档会出现以下两种情况的一种:1. 当在指定目录下轮询不到配置文件时,agent 会根据 flume.called.from.service property 这个属性来决定他的行为。如果这个属性设置了,那么会以 30 秒为周期地进行轮询;如果没设置,那么找不到就立即停止。2. 如果 agent 在加载过配置文件后在指定路径轮询不到文件的话,那么将不会做任何改变,然后继续轮询。

Log4J Appender(Log4J 日志存储器)

Appends Log4j events to a flume agent’s avro source. A client using this appender must have the flume-ng-sdk in the classpath (eg, flume-ng-sdk-1.8.0-SNAPSHOT.jar). Required properties are in bold.

将 Log4j events 添加到一个 flume agent 的 avro source。一个客户端想要使用这个 appender 必须要有 flume-ng-sdk 在类路径下(例如 flume-ng-sdk-1.8.0-SNAPSHOT.jar)。必须要的属性用黑体加粗。

Property Name

Default

Description

Hostname

–            

The hostname on which a remote Flume agent is running with an avro source.

运行 avro source 的远程 Flumeagent 的主机名

Port

The port at which the remote Flume agent’s avro source is listening.

远程 Flume agent 的 avro source 所监听的端口

UnsafeMode

false          

If true, the appender will not throw exceptions on failure to send the events.

如果设为真,appender 将不会在发送 events 失败时抛出异常。

AvroReflectionEnabled

false

Use Avro Reflection to serialize Log4j events. (Do not use when users log strings)

使用 Avro 反射来序列化 Log4j events。

AvroSchemaUrl

A URL from which the Avro schema can be retrieved.

一个用来恢复数据的 URL,该 URL 是从 Avro schema 来的。

Sample log4j.properties file:

#...

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender

log4j.appender.flume.Hostname = example.com

log4j.appender.flume.Port = 41414

log4j.appender.flume.UnsafeMode = true

 

# configure a class's logger to output to the flume appender

log4j.logger.org.example.MyClass = DEBUG,flume

#... 

By default each event is converted to a string by calling toString(), or by using the Log4j layout, if specified.

If the event is an instance of org.apache.avro.generic.GenericRecord, org.apache.avro.specific.SpecificRecord, or if the property AvroReflectionEnabled  is set to true then the event will be serialized using Avro serialization.

Serializing every event with its Avro schema is inefficient, so it is good practice to provide a schema URL from which the schema can be retrieved by the downstream sink, typically the HDFS sink. If AvroSchemaUrl is not specified, then the schema will be included as a Flume header.

Sample log4j.properties file configured to use Avro serialization:

每个 events 默认都可以通过 toString()来转换成字符串,或者有指定的话可用 Log4j layout。

如果 events 是一个 org.apache.avro.generic.GenericRecord, org.apache.avro.specific.SpecificRecord 类的实例,或者它的属性 AvroReflectionEnabled 的值为 true,那么会使用 Avro serialization 进行序列化。

对每个 event 和它的 Avro schema 进行序列化是低效的,所以,一个好的实践是提供一个可以从下流的 sink 中恢复的 schemaURL,通常是 HDFS sink。如果没有指定 AvroSchemaUrl 的话,schema 会被纳入到 Flume haeder。

一个使用 Avro serialization 的 log4j 属性文档的例子如下:

#...

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender

log4j.appender.flume.Hostname = example.com

log4j.appender.flume.Port = 41414

log4j.appender.flume.AvroReflectionEnabled = true

log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc

 

# configure a class's logger to output to the flume appender

log4j.logger.org.example.MyClass = DEBUG,flume

#... 

Load Balancing Log4J Appender

Appends Log4j events to a list of flume agent’s avro source. A client using this appender must have the flume-ng-sdk in the classpath (eg, flume-ng-sdk-1.8.0-SNAPSHOT.jar). This appender supports a round-robin and random scheme for performing the load balancing. It also supports a configurable backoff timeout so that down agents are removed temporarily from the set of hosts .Required properties are in bold.

将 Log4j events 添加到一个 flume agent 的 avro source。一个客户端想要使用这个 appender 必须要有 flume-ng-sdk 在类路径下(例如 flume-ng-sdk-1.8.0-SNAPSHOT.jar)。这个日志存储器支持一个循环和随机计划来执行负载均衡。它也支持一个可配置的后移超时以便将挂掉的 agent 从主机中移除。黑体字标注的属性是必须要的。

Property Name

Default

Description

Hosts

A space-separated list of host:port at which Flume (through an AvroSource) is listening for events

列出监听 events 的主机列表,每个 host:port 用空格隔开。

Selector

ROUND_ROBIN 

Selection mechanism. Must be either ROUND_ROBIN, RANDOM or custom FQDN to class that inherits from LoadBalancingSelector.

选择机制。必须从 ROUND_ROBIN,RANDOM 或者继承 LoadBalancingSelector 的自定义 FQDN 类。

MaxBackoff

–                      

A long value representing the maximum amount of time in milliseconds the Load balancing client will backoff from a node that has failed to consume an event. Defaults to no backoff

这个值代表以毫秒为单位的退避超时最大值,也就是当一个节点在消��event 时失效了,等待超时时间再进行重发 event。默认是没有退避的

UnsafeMode

false

If true, the appender will not throw exceptions on failure to send the events.

如果设为真,appender 将不会在发送 events 失败时抛出异常。

AvroReflectionEnabled

false

Use Avro Reflection to serialize Log4j events.

使用 Avro 反射来序列化 Log4j events。

AvroSchemaUrl

A URL from which the Avro schema can be retrieved.

一个用来恢复数据的 URL,该 URL 是从 Avro schema 来的。

Sample log4j.properties file configured using defaults:

#...

log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender

log4j.appender.out2.Hosts = localhost:25430 localhost:25431

 

# configure a class's logger to output to the flume appender

log4j.logger.org.example.MyClass = DEBUG,flume

#...

Sample log4j.properties file configured using RANDOM load balancing:

#...

log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender

log4j.appender.out2.Hosts = localhost:25430 localhost:25431

log4j.appender.out2.Selector = RANDOM

 

# configure a class's logger to output to the flume appender

log4j.logger.org.example.MyClass = DEBUG,flume

#...

Sample log4j.properties file configured using backoff:

#...

log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender

log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432

log4j.appender.out2.Selector = ROUND_ROBIN

log4j.appender.out2.MaxBackoff = 30000

 

# configure a class's logger to output to the flume appender

log4j.logger.org.example.MyClass = DEBUG,flume

#... 

Security(安全性)

The HDFS sink, HBase sink, Thrift source, Thrift sink and Kite Dataset sink all support Kerberos authentication. Please refer to the corresponding sections for configuring the Kerberos-related options.

Flume agent will authenticate to the kerberos KDC as a single principal, which will be used by different components that require kerberos authentication. The principal and keytab configured for Thrift source, Thrift sink, HDFS sink, HBase sink and DataSet sink should be the same, otherwise the component will fail to start.

HDFS sink、HBase sink、Thrift source、Thrift sink 和 Kite Dataset sink 支持 Kerberos 认证。请参考配置 Kerberos 相关选项的章节。

当 agent 中的不同组件需要 kerberos 验证,Flume agent 会作为 kerberos KDC 验证的主体。Thrift source, Thrift sink, HDFS sink, HBase sink and DataSet sink 的密钥和主体都应该相同,否则组件无法启动。

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-12/138030.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计50093字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中