首页 > Windows开发 > 详细

Flink入门 - API

时间:2019-11-19 10:53:00      阅读:101      评论:0      收藏:0      [点我收藏+]
final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 

/* 
* Filter 
*/ 
DataStream<Long> input = streamExecutionEnvironment.generateSequence(-5, 5); 

input.filter(new FilterFunction<Long>() { 

@Override 
public boolean filter(Long value) throws Exception { 
// TODO Auto-generated method stub 
return value >= 0; 
} 
}).print(); 

streamExecutionEnvironment.execute(); 

/* 
* Connect 
*/ 

DataStream<Long> someStream = streamExecutionEnvironment.generateSequence(0, 10); 

DataStream<String> otherStream = streamExecutionEnvironment.fromElements(WordCountData.WORDS); 

ConnectedStreams<Long, String> connectedStreams = someStream.connect(otherStream); 

DataStream<String> result = connectedStreams.flatMap(new CoFlatMapFunction<Long, String, String>() { 

@Override 
public void flatMap1(Long value, Collector<String> out) throws Exception { 
// TODO Auto-generated method stub 
out.collect(value.toString()); 
} 

@Override 
public void flatMap2(String value, Collector<String> out) throws Exception { 
// TODO Auto-generated method stub 
Arrays.asList(value.split("\\W+")).stream().forEachOrdered(str -> out.collect(str)); 
} 
}); 

result.print(); 

streamExecutionEnvironment.execute(); 

/* 
* KeyBy 
*/ 

DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT); 

KeyedStream<Tuple4<String, String, String, Integer>, Tuple> keyedStream = input.keyBy("f0"); 

keyedStream.print(); 

keyedStream.maxBy("f3").print(); 

streamExecutionEnvironment.execute(); 

public static final Tuple4[] TRANSCRIPT = new Tuple4[] { 

Tuple4.of("class1","张三","语文",100), 

Tuple4.of("class1","李四","语文",78), 

Tuple4.of("class1","王五","语文",99), 

Tuple4.of("class2","赵六","语文",81), 

Tuple4.of("class2","钱七","语文",59), 

Tuple4.of("class2","马二","语文",97) 

};       

/* 
* Map 
*/ 
DataStream<Long> input = streamExecutionEnvironment.generateSequence(0, 10); 

DataStream<Long> plusOne = input.map(new MapFunction<Long, Long>() { 

@Override 
public Long map(Long value) throws Exception { 
// TODO Auto-generated method stub 
return value + 1; 
} 
}); 

plusOne.print(); 

streamExecutionEnvironment.execute(); 

/* 
* Fold 
*/ 
DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT); 

DataStream<String> result = input.keyBy(0).fold("Start", new FoldFunction<Tuple4<String, String, String, Integer>, String>() { 

@Override 
public String fold(String str, Tuple4<String, String, String, Integer> value) throws Exception { 
// TODO Auto-generated method stub 
return str + " = " + value.f1 + " "; 
} 
}); 

result.print(); 

streamExecutionEnvironment.execute(); 

public static final Tuple4[] TRANSCRIPT = new Tuple4[] { 

Tuple4.of("class1","张三","语文",100), 

Tuple4.of("class1","李四","语文",78), 

Tuple4.of("class1","王五","语文",99), 

Tuple4.of("class2","赵六","语文",81), 

Tuple4.of("class2","钱七","语文",59), 

Tuple4.of("class2","马二","语文",97) 

}; 

/** 
1> Start = 赵六 
1> Start = 赵六 = 钱七 
1> Start = 赵六 = 钱七 = 马二 

2> Start = 张三 
2> Start = 张三 = 李四 
2> Start = 张三 = 李四 = 王五 
*/ 

/* 
* Reduce 
*/ 
DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT); 

KeyedStream<Tuple4<String, String, String, Integer>, Tuple> keyedStream = input.keyBy(0); 

keyedStream.reduce(new ReduceFunction<Tuple4<String, String, String, Integer>>() { 

@Override 
public Tuple4<String, String, String, Integer> reduce(Tuple4<String, String, String, Integer> value1, 
Tuple4<String, String, String, Integer> value2) throws Exception { 
// TODO Auto-generated method stub 
value1.f3 += value2.f3; 
return value1; 
} 
}).print(); 

streamExecutionEnvironment.execute(); 

/** 
2> (class1,张三,语文,100) 
2> (class1,张三,语文,178) 
2> (class1,张三,语文,277) 
1> (class2,赵六,语文,81) 
1> (class2,赵六,语文,140) 
1> (class2,赵六,语文,237) 
*/ 

/* 
* Project 
*/ 
DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT); 

DataStream<Tuple2<String, Integer>> output = input.project(1, 3); 

output.print(); 

streamExecutionEnvironment.execute(); 

/** 
4> (张三,100) 
4> (钱七,59) 
2> (王五,99) 
3> (赵六,81) 
1> (李四,78) 
1> (马二,97) 
*/ 

/* 
* SplitAndSelect 
*/ 
DataStream<Long> input = streamExecutionEnvironment.generateSequence(0, 10); 

SplitStream<Long> splitStream = input.split(new OutputSelector<Long>() { 

@Override 
public Iterable<String> select(Long value) { 
// TODO Auto-generated method stub 
List<String> output = new ArrayList<>(); 
if (value % 2 == 0) { 
output.add(EVEN); 
} else { 
output.add(ODD); 
} 
return output; 
} 
}); 

//      splitStream.print(); 

DataStream<Long> even = splitStream.select(EVEN); 

DataStream<Long> odd = splitStream.select(ODD); 

DataStream<Long> all = splitStream.select(EVEN, ODD); 

odd.print(); 

streamExecutionEnvironment.execute(); 

/* 
* FlatMap 
*/ 
DataStream<String> input = streamExecutionEnvironment.fromElements(WordCountData.WORDS); 

DataStream<String> wordStream = input.flatMap(new FlatMapFunction<String, String>() { 

@Override 
public void flatMap(String value, Collector<String> out) throws Exception { 
// TODO Auto-generated method stub 
Arrays.asList(value.toLowerCase().split("\\W+")).stream().filter(str -> str.length() > 0).forEach(str -> out.collect(str)); 
} 
}); 

wordStream.print(); 

streamExecutionEnvironment.execute();

Flink入门 - API

原文:https://www.cnblogs.com/fangpengchengbupter/p/11887688.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!