Property Name | Default | Description |
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be default, failover or load_balance |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance
Property Name | Default | Description |
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be failover |
processor.priority.<sinkName> | – |
Priority value. <sinkName> must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority |
processor.maxpenalty | 30000 | The maximum backoff period for the failed Sink (in millis) |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
Property Name | Default | Description |
processor.sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be load_balance |
processor.backoff | false | Should failed sinks be backed off exponentially. |
processor.selector | round_robin | Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector |
processor.selector.maxTimeOut | 30000 | Used by backoff selectors to limit exponential backoff (in milliseconds) |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
Property Name | Default | Description |
appendNewline | true |
Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons. |
Example for agent named a1:
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
这个拦截器将Flume事件序列化到Avro容器文件中。使用的模式与Avro RPC机制中Flume事件使用的模式相同。
Property Name | Default | Description |
syncIntervalBytes | 2048000 | Avro sync interval, in approximate bytes. |
compressionCodec | null | Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
Example for agent named a1:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = avro_event a1.sinks.k1.serializer.compressionCodec = snappy
要在事件标头中传递记录模式,请指定事件标头flume.avro.schema。包含模式或flume.av .schema的json格式表示的文本。一个可以找到模式的url (hdfs:/…支持uri)。
Property Name | Default | Description |
syncIntervalBytes | 2048000 | Avro sync interval, in approximate bytes. |
compressionCodec | null | Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
schemaURL | null | Avro schema URL. Schemas specified in the header ovverride this option. |
Example for agent named a1:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder a1.sinks.k1.serializer.compressionCodec = snappy a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d a1.sinks.k1.channel = c1
注意,拦截器构建器被传递给type config参数。拦截器本身是可配置的,可以像传递给任何其他可配置组件一样传递配置值。在上面的示例中,首先将事件传递给HostInterceptor,然后将HostInterceptor返回的事件传递给TimestampInterceptor。可以指定完全限定类名(FQCN)或别名时间戳。如果有多个收集器写入相同的HDFS路径,那么还可以使用HostInterceptor。
Property Name | Default | Description |
type | – | The component type name, has to be timestamp or the FQCN |
header | timestamp | The name of the header in which to place the generated timestamp. |
preserveExisting | false | If the timestamp already exists, should it be preserved - true or false |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
Property Name | Default | Description |
type | – | The component type name, has to be host |
preserveExisting | false | If the host header already exists, should it be preserved - true or false |
useIP | true | Use the IP Address if true, else use hostname. |
hostHeader | host | The header key to be used. |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host
Property Name | Default | Description |
type | – | The component type name, has to be static |
preserveExisting | true | If configured header already exists, should it be preserved - true or false |
key | key | Name of header that should be created |
value | value | Static value that should be created |
Example for agent named a1:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK
Property Name | Default | Description |
type | – | The component type name has to be remove_header |
withName | – | Name of the header to remove |
fromList | – | List of headers to remove, separated with the separator specified by fromListSeparator |
fromListSeparator | \s*,\s* |
Regular expression used to separate multiple header names in the list specified by fromList. Default is a comma surrounded by any number of whitespace characters |
matching | – | All the headers which names match this regular expression are removed |
这个拦截器在所有被拦截的事件上设置一个统一的唯一标识符。一个示例UUID是b5755073-77a9-43c1-8fa -b7a586fc1b97,它表示128位值。
如果没有事件的应用程序级惟一键可用,可以考虑使用UUIDInterceptor自动为事件分配UUID。当事件进入Flume网络时,为它们分配uuid是非常重要的;也就是说,在第一个Flume source 的流中。这使得在为高可用性和高性能而设计的Flume网络中,面对复制和重新交付时,可以对事件进行后续重复数据删除。如果应用程序级密钥可用,这比自动生成的UUID更可取,因为它使用已知的应用程序级密钥支持数据存储中事件的后续更新和删除。
Property Name | Default | Description |
type | – | The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName | id | The name of the Flume header to modify |
preserveExisting | true | If the UUID header already exists, should it be preserved - true or false |
prefix | “” | The prefix string constant to prepend to each generated UUID |
这个拦截器通过一个形态线配置文件过滤事件,该文件定义了一个转换命令链,将记录从一个命令传输到另一个命令。例如,morphline可以忽略某些事件,或者通过基于正则表达式的模式匹配更改或插入某些事件头部,或者可以通过Apache Tika自动检测并在被截获的事件上设置MIME类型。例如,这种包嗅探可以用于Flume拓扑中基于内容的动态路由。MorphlineInterceptor还可以帮助实现到多个Apache Solr集合的动态路由(例如,对于多租户)。
目前,有一个限制,拦截器的形态线不能为每个输入事件生成多个输出记录。这个拦截器不是为繁重的ETL处理而设计的——如果你需要的话,可以考虑将ETL处理从Flume源转移到Flume Sink,例如到MorphlineSolrSink。
Property Name | Default | Description |
type | – |
The component type name has to be org.apache.flume.sink.solr. morphline.MorphlineInterceptor$Builder |
morphlineFile | – |
The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId | null |
Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
Sample flume.conf file:
a1.sources.avroSrc.interceptors = morphlineinterceptor a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
这个拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。也可以使用回溯/组捕获。这个拦截器使用与Java Matcher.replaceAll()方法中相同的规则。
Property Name | Default | Description |
type | – | The component type name has to be search_replace |
searchPattern | – | The pattern to search for and replace. |
replaceString | – | The replacement string. |
charset | UTF-8 | The charset of the event body. Assumed by default to be UTF-8. |
Example configuration:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Remove leading alphanumeric characters in an event body. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ a1.sources.avroSrc.interceptors.search-replace.replaceString =
Another example:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Use grouping operators to reorder and munge words on a line. a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+) a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1
Property Name | Default | Description |
type | – | The component type name has to be regex_filter |
regex | ”.*” | Regular expression for matching against events |
excludeEvents | false | If true, regex determines events to exclude, otherwise regex determines events to include. |
Property Name | Default | Description |
type | – | The component type name has to be regex_extractor |
regex | – | Regular expression for matching against events |
serializers | – |
Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializerorg. apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer |
serializers.<s1>.type | default |
Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer |
serializers.<s1>.name | – | |
serializers.* | – | Serializer-specific properties |
序列化器用于将匹配映射到标题名称和格式化的标题值;默认情况下,您只需要指定标题名称,并使用默认的org.apache.flume.interceptor. regexextractorinterceptorpassthrough序列化器。这个序列化器只是将匹配映射到指定的头名称,并在regex提取值时传递该值。您可以使用完全限定类名(FQCN)将自定义序列化器实现插入提取器中,以按照您喜欢的方式格式化匹配。
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
提取的事件将包含相同的主体,但是添加了以下头部:1 =>1,2 =>2,3 =>3
如果水槽事件体包含2012-10-18 18:47:57,614,则使用一些日志行,并使用以下配置
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
【翻译】Flume 1.8.0 User Guide(用户指南) Processors