首页 > Web开发 > 详细

Hadoop生态圈-Flume的组件之自定义Sink

时间:2018-06-21 10:10:04      阅读:199      评论:0      收藏:0      [点我收藏+]

                    Hadoop生态圈-Flume的组件之自定义Sink

                                           作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

 

 

 

一.自定义Sink的步骤

1>.编写自定义sink

 

2>.打包并将其发送到 /soft/flume/lib下

[yinzhengjie@s101 ~]$ cd /soft/flume/lib/
[yinzhengjie@s101 lib]$ rz

[yinzhengjie@s101 lib]$ 
[yinzhengjie@s101 lib]$ ll | grep MyFlume
-rw-r--r--  1 yinzhengjie yinzhengjie    3398 Jun 20 18:12 MyFlume-1.0-SNAPSHOT.jar
[yinzhengjie@s101 lib]$ 

3>.编写agent的配置文件

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_mysink.conf 
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# Describe the sink
a1.sinks.k1.type = cn.org.yinzhengjie.sink.MySink

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[yinzhengjie@s101 ~]$ 

4>.启动flume并测试

   a>.启动agent进程

技术分享图片
[yinzhengjie@s101 ~]$ flume-ng agent -f /soft/flume/conf/yinzhengjie_mysink.conf -n a1
Warning: No configuration directory set! Use --conf <dir> to override.
Warning: JAVA_HOME is not set!
Info: Including Hadoop libraries found via (/soft/hadoop/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/soft/hbase/bin/hbase) for HBASE access
Info: Including Hive libraries found via () for Hive access
+ exec /soft/jdk/bin/java -Xmx20m -cp /soft/flume/lib/*:/soft/hadoop-2.7.3/etc/hadoop:/soft/hadoop-2.7.3/share/hadoop/common/lib/*:/soft/hadoop-2.7.3/share/hadoop/common/*:/soft/hadoop-2.7.3/share/hadoop/hdfs:/soft/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/soft/hadoop-2.7.3/share/hadoop/hdfs/*:/soft/hadoop-2.7.3/share/hadoop/yarn/lib/*:/soft/hadoop-2.7.3/share/hadoop/yarn/*:/soft/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/soft/hadoop-2.7.3/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/soft/hbase/bin/../conf:/soft/jdk//lib/tools.jar:/soft/hbase/bin/..:/soft/hbase/bin/../lib/activation-1.1.jar:/soft/hbase/bin/../lib/aopalliance-1.0.jar:/soft/hbase/bin/../lib/apacheds-i18n-2.0.0-M15.jar:/soft/hbase/bin/../lib/apacheds-kerberos-codec-2.0.0-M15.jar:/soft/hbase/bin/../lib/api-asn1-api-1.0.0-M20.jar:/soft/hbase/bin/../lib/api-util-1.0.0-M20.jar:/soft/hbase/bin/../lib/asm-3.1.jar:/soft/hbase/bin/../lib/avro-1.7.4.jar:/soft/hbase/bin/../lib/commons-beanutils-1.7.0.jar:/soft/hbase/bin/../lib/commons-beanutils-core-1.8.0.jar:/soft/hbase/bin/../lib/commons-cli-1.2.jar:/soft/hbase/bin/../lib/commons-codec-1.9.jar:/soft/hbase/bin/../lib/commons-collections-3.2.2.jar:/soft/hbase/bin/../lib/commons-compress-1.4.1.jar:/soft/hbase/bin/../lib/commons-configuration-1.6.jar:/soft/hbase/bin/../lib/commons-daemon-1.0.13.jar:/soft/hbase/bin/../lib/commons-digester-1.8.jar:/soft/hbase/bin/../lib/commons-el-1.0.jar:/soft/hbase/bin/../lib/commons-httpclient-3.1.jar:/soft/hbase/bin/../lib/commons-io-2.4.jar:/soft/hbase/bin/../lib/commons-lang-2.6.jar:/soft/hbase/bin/../lib/commons-logging-1.2.jar:/soft/hbase/bin/../lib/commons-math-2.2.jar:/soft/hbase/bin/../lib/commons-math3-3.1.1.jar:/soft/hbase/bin/../lib/commons-net-3.1.jar:/soft/hbase/bin/../lib/disruptor-3.3.0.jar:/soft/hbase/bin/../lib/findbugs-annotations-1.3.9-1.jar:/soft/hbase/bin/../lib/guava-12.0.1.jar:/soft/hbase/bin/../lib/guice-3.0.jar:/soft/hbase/bin/../lib/guice-servlet-3.0.jar:/soft/hbase/bin/../lib/hadoop-annotations-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-auth-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-client-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-common-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-hdfs-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-mapreduce-client-app-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-mapreduce-client-common-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-mapreduce-client-core-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-mapreduce-client-jobclient-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-mapreduce-client-shuffle-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-yarn-api-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-yarn-client-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-yarn-common-2.5.1.jar:/soft/hbase/bin/../lib/hadoop-yarn-server-common-2.5.1.jar:/soft/hbase/bin/../lib/hbase-annotations-1.2.6.jar:/soft/hbase/bin/../lib/hbase-annotations-1.2.6-tests.jar:/soft/hbase/bin/../lib/hbase-client-1.2.6.jar:/soft/hbase/bin/../lib/hbase-common-1.2.6.jar:/soft/hbase/bin/../lib/hbase-common-1.2.6-tests.jar:/soft/hbase/bin/../lib/hbase-examples-1.2.6.jar:/soft/hbase/bin/../lib/hbase-external-blockcache-1.2.6.jar:/soft/hbase/bin/../lib/hbase-hadoop2-compat-1.2.6.jar:/soft/hbase/bin/../lib/hbase-hadoop-compat-1.2.6.jar:/soft/hbase/bin/../lib/hbase-it-1.2.6.jar:/soft/hbase/bin/../lib/hbase-it-1.2.6-tests.jar:/soft/hbase/bin/../lib/hbase-prefix-tree-1.2.6.jar:/soft/hbase/bin/../lib/hbase-procedure-1.2.6.jar:/soft/hbase/bin/../lib/hbase-protocol-1.2.6.jar:/soft/hbase/bin/../lib/hbase-resource-bundle-1.2.6.jar:/soft/hbase/bin/../lib/hbase-rest-1.2.6.jar:/soft/hbase/bin/../lib/hbase-server-1.2.6.jar:/soft/hbase/bin/../lib/hbase-server-1.2.6-tests.jar:/soft/hbase/bin/../lib/hbase-shell-1.2.6.jar:/soft/hbase/bin/../lib/hbase-thrift-1.2.6.jar:/soft/hbase/bin/../lib/htrace-core-3.1.0-incubating.jar:/soft/hbase/bin/../lib/httpclient-4.2.5.jar:/soft/hbase/bin/../lib/httpcore-4.4.1.jar:/soft/hbase/bin/../lib/jackson-core-asl-1.9.13.jar:/soft/hbase/bin/../lib/jackson-jaxrs-1.9.13.jar:/soft/hbase/bin/../lib/jackson-mapper-asl-1.9.13.jar:/soft/hbase/bin/../lib/jackson-xc-1.9.13.jar:/soft/hbase/bin/../lib/jamon-runtime-2.4.1.jar:/soft/hbase/bin/../lib/jasper-compiler-5.5.23.jar:/soft/hbase/bin/../lib/jasper-runtime-5.5.23.jar:/soft/hbase/bin/../lib/javax.inject-1.jar:/soft/hbase/bin/../lib/java-xmlbuilder-0.4.jar:/soft/hbase/bin/../lib/jaxb-api-2.2.2.jar:/soft/hbase/bin/../lib/jaxb-impl-2.2.3-1.jar:/soft/hbase/bin/../lib/jcodings-1.0.8.jar:/soft/hbase/bin/../lib/jersey-client-1.9.jar:/soft/hbase/bin/../lib/jersey-core-1.9.jar:/soft/hbase/bin/../lib/jersey-guice-1.9.jar:/soft/hbase/bin/../lib/jersey-json-1.9.jar:/soft/hbase/bin/../lib/jersey-server-1.9.jar:/soft/hbase/bin/../lib/jets3t-0.9.0.jar:/soft/hbase/bin/../lib/jettison-1.3.3.jar:/soft/hbase/bin/../lib/jetty-6.1.26.jar:/soft/hbase/bin/../lib/jetty-sslengine-6.1.26.jar:/soft/hbase/bin/../lib/jetty-util-6.1.26.jar:/soft/hbase/bin/../lib/joni-2.1.2.jar:/soft/hbase/bin/../lib/jruby-complete-1.6.8.jar:/soft/hbase/bin/../lib/jsch-0.1.42.jar:/soft/hbase/bin/../lib/jsp-2.1-6.1.14.jar:/soft/hbase/bin/../lib/jsp-api-2.1-6.1.14.jar:/soft/hbase/bin/../lib/junit-4.12.jar:/soft/hbase/bin/../lib/leveldbjni-all-1.8.jar:/soft/hbase/bin/../lib/libthrift-0.9.3.jar:/soft/hbase/bin/../lib/log4j-1.2.17.jar:/soft/hbase/bin/../lib/metrics-core-2.2.0.jar:/soft/hbase/bin/../lib/MyHbase-1.0-SNAPSHOT.jar:/soft/hbase/bin/../lib/netty-all-4.0.23.Final.jar:/soft/hbase/bin/../lib/paranamer-2.3.jar:/soft/hbase/bin/../lib/phoenix-4.10.0-HBase-1.2-client.jar:/soft/hbase/bin/../lib/protobuf-java-2.5.0.jar:/soft/hbase/bin/../lib/servlet-api-2.5-6.1.14.jar:/soft/hbase/bin/../lib/servlet-api-2.5.jar:/soft/hbase/bin/../lib/slf4j-api-1.7.7.jar:/soft/hbase/bin/../lib/slf4j-log4j12-1.7.5.jar:/soft/hbase/bin/../lib/snappy-java-1.0.4.1.jar:/soft/hbase/bin/../lib/spymemcached-2.11.6.jar:/soft/hbase/bin/../lib/xmlenc-0.52.jar:/soft/hbase/bin/../lib/xz-1.0.jar:/soft/hbase/bin/../lib/zookeeper-3.4.6.jar:/soft/hadoop-2.7.3/etc/hadoop:/soft/hadoop-2.7.3/share/hadoop/common/lib/*:/soft/hadoop-2.7.3/share/hadoop/common/*:/soft/hadoop-2.7.3/share/hadoop/hdfs:/soft/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/soft/hadoop-2.7.3/share/hadoop/hdfs/*:/soft/hadoop-2.7.3/share/hadoop/yarn/lib/*:/soft/hadoop-2.7.3/share/hadoop/yarn/*:/soft/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/soft/hadoop-2.7.3/share/hadoop/mapreduce/*::/soft/hive/lib/*:/contrib/capacity-scheduler/*.jar:/conf:/lib/* -Djava.library.path=:/soft/hadoop-2.7.3/lib/native:/soft/hadoop-2.7.3/lib/native org.apache.flume.node.Application -f /soft/flume/conf/yinzhengjie_mysink.conf -n a1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/soft/apache-flume-1.8.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/soft/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/soft/hbase-1.2.6/lib/phoenix-4.10.0-HBase-1.2-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/soft/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/soft/apache-hive-2.1.1-bin/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
18/06/20 18:34:24 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
18/06/20 18:34:24 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/soft/flume/conf/yinzhengjie_mysink.conf
18/06/20 18:34:24 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
18/06/20 18:34:24 INFO conf.FlumeConfiguration: Processing:k1
18/06/20 18:34:24 INFO conf.FlumeConfiguration: Processing:k1
18/06/20 18:34:24 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
18/06/20 18:34:24 INFO node.AbstractConfigurationProvider: Creating channels
18/06/20 18:34:24 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
18/06/20 18:34:24 INFO node.AbstractConfigurationProvider: Created channel c1
18/06/20 18:34:24 INFO source.DefaultSourceFactory: Creating instance of source r1, type netcat
18/06/20 18:34:24 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: cn.org.yinzhengjie.sink.MySink
18/06/20 18:34:24 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
18/06/20 18:34:24 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@618c8387 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/06/20 18:34:24 INFO node.Application: Starting Channel c1
18/06/20 18:34:24 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/06/20 18:34:24 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/06/20 18:34:24 INFO node.Application: Starting Sink k1
18/06/20 18:34:24 INFO node.Application: Starting Source r1
18/06/20 18:34:24 INFO source.NetcatSource: Source starting
18/06/20 18:34:24 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:8888]
[yinzhengjie@s101 ~]$ flume-ng agent -f /soft/flume/conf/yinzhengjie_mysink.conf -n a1

  b>.客户端连接(nc)

技术分享图片

  c>.检查服务端

技术分享图片

 

 

二.自定义hdfs的sink

1>.编写自定义sink

 

2>.打包并将其发送到 /soft/flume/lib下

 

3>.编写agent的配置文件

 

4>.启动flume并测试

 

 

三.

Hadoop生态圈-Flume的组件之自定义Sink

原文:https://www.cnblogs.com/yinzhengjie/p/9207067.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!