Apache Flume是为有效收集聚合和移动大量来自不同源到中心数据存储而设计的可分布,可靠的,可用的系统。
Apache Flume的用途不仅限于日志数据聚合。由于数据源是可定制的,Flume可用于传输大量事物数据包括但不限于网络流量数据,社交媒体产生的数据,Email消息和很多其它类型的数据源。
Apache Flume是Apache软件基金会的顶级项目之一。
现在有两个版本可用(版本0.9.x和 1.x)
0.9.x版本的文档在the Flume 0.9.x User Guide。
此文档适用于1.4.x以后的版本。
新用户和现有用户鼓励使用1.x版本,这样可以利用最新的架构提高性能和灵活配置。
一个Flume事件定义为,一个Flume agent是一个JVM包含这些从源到目标的所有组建的进程。
Flume source消耗外部数据源(如web服务器)发送给它的事件。外部数据源以Flume source理解的格式发送给Flume事件。例如,一个avro flume源可以 接受从avro客户端或者其他avro sink的 flume agent发送的avro事件。类似的流程也可以使用Thrift Flume Source接受来自Thrift Sink或者Flume Thrift Rpc客户端或者以任何语言编写的符合Flume thrift协议的Thrift客户端。当Flume source接收到一个时间,会把事件存储到一个或多个channels中。channel是一个被动存储(保存事件直到有Flume sink消耗)。举个例子文件类型的channel——受本地系统支持。sink从channel中移除事件并把它推送到外部存储如HDFS(通过Flume HDFS sink)或者把他传到流的下一个Flume agent source中。agent的source和sink异步操作存储在channel中的事件。
Flume允许用户建立多节点流,这种流的事件通过多个agent到达目的地。它也支持扇入和扇出流,上下文路由和为失败的节点备份路由。
事件存储在agent的channel中,然后被传到流的下一个agent或终端存储(如HDFS)。只有在他们被存储在下一个agent的channel中或这终端存储时才会被移除。这是Flume单个流的消息传递机制提供流端到端的可靠性。
Flume使用事务保证事件的可靠传输。sources和sinks分别封装在storage/retrieval,channel提供事件。这保证事件集合在流中点到点之间可靠传输。在多节点流的情况下,sink有着上一点的契约source有下一节点的契约,保证数据安全存储到下一个节点的channel中。
事件存储在channel中,channel负责灾难回复。Flume支持由本地文件系统提供的持久文件channel。内存channel只是简单的把事件存储在内存队列中,内存channel更快,但在agent进程挂掉时仍在内存的事件不能恢复。
Flume agent的配置存在一个本地的配置文件中。这是一个遵循Java properties文件格式的文本文件。一个或多个agent配置可放在同一个配置文件里。配置文件包含agent的source,sink和channel的各个属性以及他们的数据流连接。
每个流组件(source,sink或者channel)都有一个name,type和一系列的基于其type或实例的属性。例如,一个avro source需要有个hostname(或者ip地址)一个端口号来接受数据。一个内存channel有最大队列长度的属性(capacity),一个HDFS sink需要知晓文件系统的URI地址创建文件,文件访问频率(“hdfs.rollInterval”)等等。所有的这些组件属性都需要在Flume配置文件中设置。
agent需要知道加载什么组件,以及这些组件在流中的连接顺序。通过列出在agent中的source,sink和channel名称,定义每个sink和source的channel来完成。例如,一个从avroWeb的avro source来的agent流事件通过文件channel到HDFS sink集群。这个配置文件会包含三个组建,文件channel作为avroweb source 和hdfs sink的共享channel。
使用bin目录中的flume-ng脚本启动agent,你需要在命令行指定agent名称,配置文件目录,和配置文件。
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
现在agent将运行配置脚本中的source和sink。
再次,我们给一个简单的配置文件,描述一个单一节点的Flume开发。此配置文件使用户生成事件并把日志输出到控制台。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
此配置定义了一个名为a1的agent。a1有一个source监听44444端口,一个channel在内存中缓存事件数据,一个sink把日志输出到控制台。配置文件给各个组建命名描述他们的类型和配置参数。一个配置文件可能定义多个命名agent,当启动Flume进程是传递标志告诉它运行哪些agent。
给出这个配置文件,我们可以如下启动Flume:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
注意在完整的部署中,我们通常包含多个--conf=
从另一个接口,我们可以telnet 44444端口发送一个Flume事件:
$ telnet localhost 44444Trying 127.0.0.1...Connected to localhost.localdomain (127.0.0.1).Escape character is ‘^]‘.Hello world! <ENTER>OK
原始的Flume终端将输出事件日志消息。
12/06/19 15:32:19 INFO source.NetcatSource: Source starting12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
祝贺你——你已经成功配置并部署了一个Flume agent!随后的章节将涉及agent 配置的更多细节。
从管道里记录原始的流数据不是许多生产环境所关心的行为,因为可能导致敏感信息泄露或安全相关的配置,如密钥输出到Flume日志。默认情况下,Flume不记录这么多信息。另一方面,如果数据管道损坏,FLume会尝试提供调试错误的线索。
一个调试事件管道错误的方法是设置额外的内存管道连接到日志sink,它会输出所有的事件数据到Flume日志。有些情况,这种方法还不足够。
为了记录事件和配置相关数据,必须设置一些java系统属性在log4j属性文件中。
为了记录配置相关日志,设置-Dorg.apache.flume.log.printconfig=trueJava系统属性。此属性可放在命令行中或者设置在flume-env.sh的JAVA_OPTS变量中。
为了记录数据,如上设置-Dorg.apache.flume.log.rawdata=trueJava系统属性。对于大多数组件,log4j日志级别必须设置为DEBUG或TRACE似的event-specific日志出现在Flume记录中。
这是一个设置配置日志和原始数据日志的例子。同时设置了Log4j的记录级别为DEBUG:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
Flume支持使用Zookeeper配置agent。这个是一个实验特性。配置文件需要上传到zookeeper中,在一个可配置前缀下。配置文件存储在Zookeeper节点数据里。下面是a1 和 a2 agent在Zookeeper节点树的配置情况。
- /flume
|- /a1 [Agent config file]
|- /a2 [Agent config file]
一旦上传完配置文件,使用下面参数启动agent。
$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
Argument Name |
Default |
Description |
z |
- |
Zookeeper连接字符串.以逗号分割的hostname:port |
p |
/flume |
Zookeeper中存储agent配置的根目录 |
Flume有完整的插件架构。当Flume通过source,channel,sink,serializer等组件,存在许多实现分割Flume。
始终可以使用flume-env.sh文件中的FLUME_CLASSPATH变量路径添加自定义的Flume组件,Flume现在支持一个特殊的文件夹pluguins.d自动获得组件。这允许更简单的插件包管理问题,更简单的调试和错误定位,特别是依赖包的冲突。
plugins.d文件夹在$FLUME_HOME/plugins.d。在启动时,flume-ng启动脚本查看plugins.d目录文件,检查符合一下格式的插件把它们导入到java路径中。
plugins.d中的每个插件都可以有三个子目录:
native - 任何需要的本地库文件,如.so文件。
下面是plugins.d目录中包含两个插件的例子
plugins.d/
plugins.d/custom-source-1/
plugins.d/custom-source-1/lib/my-source.jar
plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
plugins.d/custom-source-2/
plugins.d/custom-source-2/lib/custom.jar
plugins.d/custom-source-2/native/gettext.so
Flume支持很多从外部数据源提取数据的机制。
包含在Flume分布式系统的一个Avro客户端可以使用avro远程方法调用发送一个给定的文件给Flume Avro source:
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
上面的命令将把/usr/logs/log.10文件内容传递到Flume source的监听端口。
有一个exec类型的 source执行一个给定的命令并消耗输出(例如一个以\r、\n或\r\n分隔的单独line)。
注意:Flume不支持tail作数据源,可以把tail封装在exec source中传输文件
Flume支持一下流行的日志流类型读取数据机制,如:
Netcat
为了在多个agent之间流动数据,前一个agent的sink和当前的source需要都是avro类型设置相同的hostname和port。
日志收集的常见场景是大量的日志生成客户端发送数据到少量的消费者agent存储子系统。例如,日志从几百个web服务器发送数据到十几个agents然后存储到HDFS集群。
这可以通过Flume配置许多avro sink第一层agent,
Flume支持多路复用事件流到一个或多个目的地。这是通过定义一个flow multiplexer来实现的,它可以复制或选择事件流到一个或多个channels。
上面的示例展示了从一个foo agent扇出流到多个channels中。这种扇出可以复制或选择。在复制流的情况下,每个事件都被发送到所有的channels中。在选择的情况下,如果一个事件被发送到一些channels中,当一个事件的属性匹配一个预先配置的值。例如,如果一个事件的属性txnType设置为customer,name他会去channel1和channel3,如果设置为vendor,会去channel2,否则去channel3。这种映射可以在agent的配置文件中设置。
之前章节提到,Flume agent的配置是从一个类似Java多层次属性文件格式的文件中提取的。
使用单一的agent定义流,你需要使用channel连接source和sink。需要列出给定agent的source,sink和channel,然后指定source和sink到一个channel。一个source可以指定多个channels而一个sink只能指定一个channel。格式如下:
# list the sources, sinks and channels for the agent<Agent>.sources = <Source><Agent>.sinks = <Sink><Agent>.channels = <Channel1> <Channel2>
# set channel for source<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink<Agent>.sinks.<Sink>.channel = <Channel1>
例如,一个名为agent_foo的agent从外部avro客户端读取数据然后通过内存channel发送到HDFS。配置文件weblog.config可能如下所示:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
这可以使事件流通过mem-channel-1从avro-AppSrv-source到hdfs-Cluster1-sink。当agent从weblog.config配置文件开始,将会实例化这个流。
定义流后,你需要每个source,sink和channel的属性。你设置组件类型和其他的指定组件的属性,是使用相同的层次命名格式完成的。
# properties for sources<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks<Agent>.sources.<Sink>.<someProperty> = <someValue>
每个组件的·type·属性都要设置为Flume能理解的类型。每个source,sink和channel类型都有它们自己需要的属性和功能集合。在需要的情况这些都需要设置,前一个例子中,我们有一个使用内存channel mem-channel-1从avro-AppSrv-source到hdfs-Cluster1-sink的流。这是展示每个组件的配置的例子:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
一个Flume agent可以包含多个独立流。你可以在一个配置文件列出多个sources,sinks和channels。这些组件可以同多个流连接。
# list the sources, sinks and channels for the agent<Agent>.sources = <Source1> <Source2><Agent>.sinks = <Sink1> <Sink2><Agent>.channels = <Channel1> <Channel2>
然后你可以使用相应的channels连接sources和links设置两个不同的流。例如,你需要在一个agent中设置两个流,一个从外部的avro客户端到外部的HDFS系统另外一个从外部的tail命令输出到avro sink,有一个配置文件可以做到:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
设置一个多层流,你需要有一个avro/thrift sink到一个avro/thrift source。这会把第一个flume agent结果转发下一个flume agent中。例如,如果你定期的使用avro客户端传递文件到本地的flume agent中,那么这个本地的agent可以转发到另一个agent中存储。
Weblog agent 配置:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
# avro sink properties
agent_foo.sources.avro-forward-sink.type = avro
agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sources.avro-forward-sink.port = 10000
# configure other pieces#...
HDFS agent配置:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
# avro sink properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
# configure other pieces#...
此处我们连接weblogagent的the avro-forward-sink到hdfs agent的avro-collection-source。这将会使来自外部appserver的事件最终存储在HDFS系统。
前一节讨论过,Flume支持从一个source到多个channels的扇出流。有两种模式的扇出方式:重复和多路选择。在重复流中,事件被发送到配置的channels中。在多路选择中,事件被发送到匹配的channels。为了实现扇出流,需要指定一个source的channels列表和定义扇出规则。这是通过添加一个selector的channel,其支持重复和多路选择。如果是多路选择还要指定选择的规则。如果你不定义一个selector,默认是重复。
# List the sources, sinks and channels for the agent<Agent>.sources = <Source1><Agent>.sinks = <Sink1> <Sink2><Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks<Agent>.sinks.<Sink1>.channel = <Channel1><Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
多路选择有更多的分叉流属性。需要为channel设置映射事件属性。选择器检查每个事件头的配置属性。如果陪陪指定的值,就会把事件发送给所有匹配的channel。如果没有匹配的,事件会被发送到默认配置的channel。
# Mapping for multiplexing selector<Agent>.sources.<Source1>.selector.type = multiplexing<Agent>.sources.<Source1>.selector.header = <someHeader><Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1><Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2><Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>#...
<Agent>.sources.<Source1>.selector.default = <Channel2>
映射允许每个channel值的覆盖。
下面的例子有一个流选择到两条路径。名为agent_too的agent有一个avro source和两个连接到两个sinks的channels。
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
选择器检查每个State的头。如果值是CA就发送到mem-channel-1,如果是AZ就到file-channel-2如果是NY就两个都发。如果State头没有设置或者都不匹配,将会发送到默认的mem-channel-1。
选择器也支持可选的channel。为了指定可选channel,配置参数optional如下使用:
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
选择器首相尝试发送到必须的channel中,如果有channel消费事件失败,事务失败。食物将再次尝试发送到所有的channels中。如果所有的必须channels都消费了事物。那么选择器试图发送给可选的channel。可选channel消费事件的失败简单忽略不会导致重发。
原文:https://www.cnblogs.com/frankdeng/p/9067102.html