附上Maven依赖
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> </dependencies>
MySource extends AbstractSource implements Configurable, PollableSource
主要实现逻辑是在process()里,将封装好的对象传给ChannelProcessor,ChannelProcessor自己通过事务逻辑传递参数
public class MySource extends AbstractSource implements Configurable, PollableSource { private String name; // 最核心的方法,读取数据,封装为event,写入到channel // 如果读到数据,封装为event,返回ready,否则如果当前没有读到数据,返回backoff // 每间隔5s,自动封装10个event,10个event的内容为{atguigu:i} @Override public Status process() throws EventDeliveryException { //声明默认返回的状态 Status status= Status.READY; //封装event List<Event> events=new ArrayList<>(); for (int i = 0; i < 10 ; i++) { SimpleEvent e = new SimpleEvent(); //封装数据 e.setBody((name+i).getBytes()); events.add(e); } try { // 获取当前source对应的channel的channelProcessor ChannelProcessor channelProcessor = getChannelProcessor(); //由ChannelProcessor将event放入到channel channelProcessor.processEventBatch(events); //间隔5s Thread.sleep(5000); }catch (Exception e){ status=Status.BACKOFF; e.printStackTrace(); } return status; } // 当source无法读到新的数据时,此时可以让Source所在的PollableSourceRunner线程休息会 // 休息的时间取决于getBackOffSleepIncrement() 和 getMaxBackOffSleepInterval() @Override public long getBackOffSleepIncrement() { return 1000; } @Override public long getMaxBackOffSleepInterval() { return 5000; } // 从agent的配置文件中读取指定的参数的值 @Override public void configure(Context context) { name=context.getString("name","atguigu:"); } }
MySink extends AbstractSink implements Configurable
在process()里实现逻辑,先获取Channel,再从中得到event值,与Source不同的是,这里要手动完成事务的逻辑
public class MySink extends AbstractSink implements Configurable { private String prefix; private String suffix; private Logger logger= LoggerFactory.getLogger(MySink.class); //最核心的方法,这个方法负责从channel中获取event,将event写到指定的设备 // 如果成功传输了一个或多个event,就返回ready,否则如果从channel中获取不到event,返回backoff @Override public Status process() throws EventDeliveryException { //自定义默认的返回值 Status status=Status.READY; //获取sink对应的channel Channel c = getChannel(); Event e=null; //从channel中获取take事务 Transaction transaction = c.getTransaction(); try { //开启事务 transaction.begin(); //从channel 获取一个event e = c.take(); //如果成功获取,e就指向event对象,如果没有成功获取,此时e为null,说明channel里面没有event了! if (e==null){ status=Status.BACKOFF; }else{ //取到数据,将数据写到控制台 logger.info("Header:"+e.getHeaders()+prefix+new String(e.getBody())+suffix); } //提交事务 transaction.commit(); }catch (Exception ex){ //回滚事务 transaction.rollback(); status=Status.BACKOFF; ex.printStackTrace(); }finally { //关闭事务 transaction.close(); } return status; } //从agent的配置文件中获取配置 @Override public void configure(Context context) { prefix=context.getString("prefix","===>atguigu:"); suffix=context.getString("suffix",":go!"); } }
MyInterceptor implements Interceptor
在拦截events组 的intercept()方法里调用拦截单个event的intercept()方法,这样可以省去很多代码,,,关键一点在于要创建一个内部类Builder,通过他来返回一个拦截器的实例
这里记录下我遇到的问题,如果我只定义了拦截单个event的intercept()方法,那么通过拦截器进入Channel的event就全是空值
public class MyInterceptor implements Interceptor { // 初始化,会在拦截器创建完成后,调用一次 @Override public void initialize() { } //拦截单个event,真正实现拦截的逻辑 @Override public Event intercept(Event event) { //为event的header中添加 timestamp=时间戳 Map<String, String> headers = event.getHeaders(); headers.put("timestamp",System.currentTimeMillis()+""); return event; } //拦截一组event @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } //拦截器在关闭时调用 @Override public void close() { } // 通过实现Builder来返回拦截器的一个实例 public static class Builder implements Interceptor.Builder { //返回拦截器的实例 @Override public Interceptor build() { return new MyInterceptor(); } //从agent的配置文件中读取参数 @Override public void configure(Context context) { } } }
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 自定义source,type必须是类的全类名 a1.sources.r1.type = com.atguigu.flume.custom.MySource a1.sources.r1.name = atguigu: #为source添加拦截器 a1.sources.r1.interceptors = i1 #type必须写Bulider的全类名(因为是内部类,所以需用$符) a1.sources.r1.interceptors.i1.type = com.atguigu.flume.custom.MyInterceptor$Builder # 配置sink a1.sinks.k1.type = com.atguigu.flume.custom.MySink a1.sinks.k1.prefix = ***atguigu: a1.sinks.k1.suffix = :go! # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 # 绑定和连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
原文:https://www.cnblogs.com/yangxusun9/p/12489852.html