接收器组允许用户将多个接收器分组到一个实体中。接收器处理器可用于在组内的所有接收器上提供负载平衡功能,或在出现暂时故障时实现从一个接收器到另一个接收器的故障转移。
所需属性以粗体显示。
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be default, failover or load_balance |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance
默认接收器处理器只接受单个接收器。用户不必为单个接收器创建处理器(接收器组)。相反,用户可以遵循本用户指南中前面解释的源-通道-接收器模式。
故障转移接收器处理器维护一个优先级较高的接收器列表,确保只要有一个可用的接收器,就会处理(交付)事件。
故障转移机制的工作方式是将失败的接收器降级到池中,在池中为它们分配一个冷却期,在重试之前随着顺序故障的增加而增加。一旦接收器成功发送事件,它将被恢复到活动池。接收器有一个与之相关的优先级,越大,优先级越高。如果一个接收器在发送事件时失败,那么下一个具有最高优先级的接收器将在下一次发送事件时尝试。例如,优先级为100的接收器在优先级为80的接收器之前被激活。如果没有指定优先级,则根据配置中指定接收器的顺序确定thr优先级。
若要配置,请设置接收器组处理器进行故障转移,并为所有单个接收器设置优先级。所有指定的优先级必须是唯一的。此外,可以使用maxpenalty属性设置故障转移时间的上限(以毫秒为单位)。
所需属性以粗体显示。
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be failover |
processor.priority.<sinkName> | – |
Priority value. <sinkName> must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority |
processor.maxpenalty | 30000 | The maximum backoff period for the failed Sink (in millis) |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
负载平衡接收器处理器提供了跨多个接收器的负载平衡流的能力。它维护一个活动接收器的索引列表,其中必须分布负载。实现支持通过round_robin或随机选择机制分配负载。选择机制的选择默认为round_robin类型,但是可以通过配置覆盖。通过继承AbstractSinkSelector的自定义类支持自定义选择机制。
调用时,此选择器使用其配置的选择机制选择下一个接收器并调用它。对于round_robin和random,如果所选的接收器无法交付事件,处理器将通过其配置的选择机制选择下一个可用的接收器。此实现不会将失败的接收器列入黑名单,而是继续乐观地尝试所有可用的接收器。如果所有接收器调用都导致失败,则选择器将失败传播到接收器运行器。
如果启用了backoff,接收器处理器将把失败的接收器列入黑名单,在给定的超时中删除它们。当超时结束时,如果接收仍然是无响应的,则以指数方式增加超时,以避免在无响应接收上陷入长时间等待。禁用此功能后,在循环中,所有失败的接收器负载将被传递到行中的下一个接收器,因此不会均衡
所需属性以粗体显示。
Property Name | Default | Description |
---|---|---|
processor.sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be load_balance |
processor.backoff | false | Should failed sinks be backed off exponentially. |
processor.selector | round_robin | Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector |
processor.selector.maxTimeOut | 30000 | Used by backoff selectors to limit exponential backoff (in milliseconds) |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
目前不支持自定义接收器处理器。
file_roll接收器和hdfs接收器都支持EventSerializer接口。下面提供了带有Flume的eventserializer的详细信息。
别名:text。此拦截器将事件体写入输出流,而不进行任何转换或修改。忽略事件标题。配置选项如下:
Property Name | Default | Description |
---|---|---|
appendNewline | true |
Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons. |
Example for agent named a1:
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
别名:avro_event。
这个拦截器将Flume事件序列化到Avro容器文件中。使用的模式与Avro RPC机制中Flume事件使用的模式相同。
这个序列化器继承了AbstractAvroEventSerializer类。
配置选项如下:
Property Name | Default | Description |
---|---|---|
syncIntervalBytes | 2048000 | Avro sync interval, in approximate bytes. |
compressionCodec | null | Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
Example for agent named a1:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = avro_event a1.sinks.k1.serializer.compressionCodec = snappy
别名:此序列化器没有别名,必须使用全限定类名类名指定。
这将Flume事件序列化到Avro容器文件中,如“Flume事件”Avro事件序列化器,但是记录模式是可配置的。记录模式可以指定为Flume配置属性,也可以在事件头中传递。
要将记录模式作为Flume配置的一部分传递,请使用下面列出的属性schemaURL。
要在事件标头中传递记录模式,请指定事件标头flume.avro.schema。包含模式或flume.av .schema的json格式表示的文本。一个可以找到模式的url (hdfs:/…支持uri)。
这个序列化器继承了AbstractAvroEventSerializer类。
配置选项如下:
Property Name | Default | Description |
---|---|---|
syncIntervalBytes | 2048000 | Avro sync interval, in approximate bytes. |
compressionCodec | null | Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
schemaURL | null | Avro schema URL. Schemas specified in the header ovverride this option. |
Example for agent named a1:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder a1.sinks.k1.serializer.compressionCodec = snappy a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
Flume能够修改/删除飞行中的事件。这是在拦截器的帮助下完成的。拦截器是实现org.apache.flume.interceptor.Interceptor接口的类。拦截器可以根据开发人员选择的任何标准修改甚至删除事件。Flume支持拦截器的链接。这可以通过在配置中指定拦截器构建器类名的列表来实现。拦截器在源配置中指定为空格分隔的列表。指定拦截器的顺序就是调用它们的顺序。一个拦截器返回的事件列表传递给链中的下一个拦截器。拦截器可以修改或删除事件。如果拦截器需要删除事件,它只是在返回的列表中不返回该事件。如果要删除所有事件,那么它只返回一个空列表。拦截器是命名组件,下面是一个通过配置创建拦截器的例子:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d a1.sinks.k1.channel = c1
注意,拦截器构建器被传递给type config参数。拦截器本身是可配置的,可以像传递给任何其他可配置组件一样传递配置值。在上面的示例中,首先将事件传递给HostInterceptor,然后将HostInterceptor返回的事件传递给TimestampInterceptor。可以指定完全限定类名(FQCN)或别名时间戳。如果有多个收集器写入相同的HDFS路径,那么还可以使用HostInterceptor。
此拦截器将插入事件标头,即它处理事件的millis时间。这个拦截器插入一个带有键时间戳(或由header属性指定)的消息头,其值是相关的时间戳。如果配置中已有时间戳,则此拦截器可以保留该时间戳。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be timestamp or the FQCN |
header | timestamp | The name of the header in which to place the generated timestamp. |
preserveExisting | false | If the timestamp already exists, should it be preserved - true or false |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
这个拦截器插入代理运行的主机的主机名或IP地址。它插入一个带有密钥主机的头或一个已配置密钥,该密钥的值是基于配置的主机名或主机的IP地址。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be host |
preserveExisting | false | If the host header already exists, should it be preserved - true or false |
useIP | true | Use the IP Address if true, else use hostname. |
hostHeader | host | The header key to be used. |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host
静态拦截器允许用户向所有事件附加一个具有静态值的静态标题。
当前实现不允许同时指定多个标题。相反,用户可以链接多个静态拦截器,每个拦截器定义一个静态头。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be static |
preserveExisting | true | If configured header already exists, should it be preserved - true or false |
key | key | Name of header that should be created |
value | value | Static value that should be created |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK
这个拦截器通过删除一个或多个header来操纵Flume事件header。它可以删除静态定义的头、基于正则表达式的头或列表中的头。如果这些都没有定义,或者没有标题与标准匹配,就不会修改Flume事件。
注意,如果只需要删除一个头,那么通过名称指定它会比其他两个方法提供更好的性能。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be remove_header |
withName | – | Name of the header to remove |
fromList | – | List of headers to remove, separated with the separator specified by fromListSeparator |
fromListSeparator | \s*,\s* |
Regular expression used to separate multiple header names in the list specified by fromList. Default is a comma surrounded by any number of whitespace characters |
matching | – | All the headers which names match this regular expression are removed |
这个拦截器在所有被拦截的事件上设置一个统一的唯一标识符。一个示例UUID是b5755073-77a9-43c1-8fa -b7a586fc1b97,它表示128位值。
如果没有事件的应用程序级惟一键可用,可以考虑使用UUIDInterceptor自动为事件分配UUID。当事件进入Flume网络时,为它们分配uuid是非常重要的;也就是说,在第一个Flume source 的流中。这使得在为高可用性和高性能而设计的Flume网络中,面对复制和重新交付时,可以对事件进行后续重复数据删除。如果应用程序级密钥可用,这比自动生成的UUID更可取,因为它使用已知的应用程序级密钥支持数据存储中事件的后续更新和删除。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName | id | The name of the Flume header to modify |
preserveExisting | true | If the UUID header already exists, should it be preserved - true or false |
prefix | “” | The prefix string constant to prepend to each generated UUID |
这个拦截器通过一个形态线配置文件过滤事件,该文件定义了一个转换命令链,将记录从一个命令传输到另一个命令。例如,morphline可以忽略某些事件,或者通过基于正则表达式的模式匹配更改或插入某些事件头部,或者可以通过Apache Tika自动检测并在被截获的事件上设置MIME类型。例如,这种包嗅探可以用于Flume拓扑中基于内容的动态路由。MorphlineInterceptor还可以帮助实现到多个Apache Solr集合的动态路由(例如,对于多租户)。
目前,有一个限制,拦截器的形态线不能为每个输入事件生成多个输出记录。这个拦截器不是为繁重的ETL处理而设计的——如果你需要的话,可以考虑将ETL处理从Flume源转移到Flume Sink,例如到MorphlineSolrSink。
所需属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – |
The component type name has to be org.apache.flume.sink.solr. morphline.MorphlineInterceptor$Builder |
morphlineFile | – |
The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId | null |
Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
Sample flume.conf file:
a1.sources.avroSrc.interceptors = morphlineinterceptor a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
这个拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。也可以使用回溯/组捕获。这个拦截器使用与Java Matcher.replaceAll()方法中相同的规则。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be search_replace |
searchPattern | – | The pattern to search for and replace. |
replaceString | – | The replacement string. |
charset | UTF-8 | The charset of the event body. Assumed by default to be UTF-8. |
Example configuration:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Remove leading alphanumeric characters in an event body. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ a1.sources.avroSrc.interceptors.search-replace.replaceString =
Another example:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Use grouping operators to reorder and munge words on a line. a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+) a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1
这个拦截器通过将事件体解释为文本并根据配置的正则表达式匹配文本来选择性地过滤事件。提供的正则表达式可用于包含事件或排除事件。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be regex_filter |
regex | ”.*” | Regular expression for matching against events |
excludeEvents | false | If true, regex determines events to exclude, otherwise regex determines events to include. |
这个拦截器使用指定的正则表达式提取regex匹配组,并将匹配组追加为事件的头部。它还支持可插入的序列化器,用于在将匹配组添加为事件头之前对其进行格式化。
Property Name | Default | Description |
---|---|---|
type | – | The component type name has to be regex_extractor |
regex | – | Regular expression for matching against events |
serializers | – |
Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializerorg. apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer |
serializers.<s1>.type | default |
Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer |
serializers.<s1>.name | – | |
serializers.* | – | Serializer-specific properties |
序列化器用于将匹配映射到标题名称和格式化的标题值;默认情况下,您只需要指定标题名称,并使用默认的org.apache.flume.interceptor. regexextractorinterceptorpassthrough序列化器。这个序列化器只是将匹配映射到指定的头名称,并在regex提取值时传递该值。您可以使用完全限定类名(FQCN)将自定义序列化器实现插入提取器中,以按照您喜欢的方式格式化匹配。
如果水槽事件体包含1:2:3.4foobar5,则使用以下配置
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
提取的事件将包含相同的主体,但是添加了以下头部:1 =>1,2 =>2,3 =>3
如果水槽事件体包含2012-10-18 18:47:57,614,则使用一些日志行,并使用以下配置
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
提取的事件将包含相同的主体,但是添加了以下标头的时间戳=>1350611220000
-----------------未完待续-----------
【翻译】Flume 1.8.0 User Guide(用户指南) Processors
原文:https://www.cnblogs.com/Springmoon-venn/p/10371412.html