Flink中,测试时,会用到自定义的source。
下为一例。。 该例使用温度传感器的格式演示fake日志数据源。
代码用Scala写的。
传感器 - 样例类
SensorReads.scala
:
package sr
?
/**
*
*/
case class SensorReads(id:String,
timestap:Long,
tempture:Double)
传感器 - 数据源模拟
SnsorSrc_4096T.scala
:
package sr
?
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
?
/**
* period, is 4096 millis.
*/
case class SnsorSrc_4096T extends SourceFunction[SensorReads] {
var isInRunning: Boolean = true
////
override def run(sourceContext: SourceFunction.SourceContext[
SensorReads]): Unit = {
val rand: Random = new Random
var tptNow4 =
(1 to 4).map(
"snsor_" + _.toString -> (23 + 16 * rand.nextGaussian))
?
while (isInRunning) {
tptNow4 = tptNow4.map(
t => t._1 -> (t._2 + rand.nextGaussian))
val timeStampNow: Long = System.currentTimeMillis
tptNow4.foreach{
t =>
sourceContext.collect( // O.U.T
SensorReads(t._1, timeStampNow, t._2) )
Thread.sleep(512) }
//not set, is stm
Thread.sleep(2048) }
}
override def cancel(): Unit = isInRunning = false
}
SnsrSrcAappli.scala
:
package applis
?
import org.apache.flink.streaming.api.scala._
import sr._
?
object SnsrSrcAappli extends App{
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(SnsorSrc_4096T() )
.print("aaa")
env.execute()
}
数据源模拟用case-class,此处使用则可以不写new。
IDEA控制台上run:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
aaa:3> SensorReads(snsor_1,1573556705508,30.383394411578916)
aaa:4> SensorReads(snsor_2,1573556705508,21.397405872448672)
aaa:5> SensorReads(snsor_3,1573556705508,20.598086139457727)
aaa:6> SensorReads(snsor_4,1573556705508,18.30066983735531)
aaa:7> SensorReads(snsor_1,1573556709627,30.120955223032546)
aaa:8> SensorReads(snsor_2,1573556709627,22.38746867201145)
aaa:1> SensorReads(snsor_3,1573556709627,20.45357507067989)
aaa:2> SensorReads(snsor_4,1573556709627,17.18467261133715)
aaa:3> SensorReads(snsor_1,1573556713729,31.686487593592904)
aaa:4> SensorReads(snsor_2,1573556713729,20.67106361911623)
aaa:5> SensorReads(snsor_3,1573556713729,21.27724215221553)
aaa:6> SensorReads(snsor_4,1573556713729,16.84273306583804)
?
Process finished with exit code -1
如果SnsorSrc_4096T.scala
中,「当前温度」.foreach
这样写:
tptNow4.foreach{
t =>
sourceContext.collect( // O.U.T
SensorReads(t._1, System.currentTimeMillis, t._2) )
Thread.sleep(512) }
那么结果就会是:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
aaa:5> SensorReads(snsor_1,1573561932216,20.427373784204445)
aaa:6> SensorReads(snsor_2,1573561932739,19.043151948599565)
aaa:7> SensorReads(snsor_3,1573561933251,16.506314894849734)
aaa:8> SensorReads(snsor_4,1573561933764,42.18791135873409)
aaa:1> SensorReads(snsor_1,1573561936326,20.216273863226476)
aaa:2> SensorReads(snsor_2,1573561936838,19.77488458362011)
aaa:3> SensorReads(snsor_3,1573561937351,17.49661332626548)
aaa:4> SensorReads(snsor_4,1573561937864,42.37076203420432)
aaa:5> SensorReads(snsor_1,1573561940425,19.582646754534526)
aaa:6> SensorReads(snsor_2,1573561940938,18.148182987020572)
aaa:7> SensorReads(snsor_3,1573561941451,17.028248074961432)
aaa:8> SensorReads(snsor_4,1573561941963,42.969281620777075)
aaa:1> SensorReads(snsor_1,1573561944525,20.659855873131406)
aaa:2> SensorReads(snsor_2,1573561945038,19.437515708059177)
aaa:3> SensorReads(snsor_3,1573561945550,18.336847248220565)
aaa:4> SensorReads(snsor_4,1573561946063,43.58727112744526)
aaa:5> SensorReads(snsor_1,1573561948625,19.317498008380674)
aaa:6> SensorReads(snsor_2,1573561949137,21.86602577501872)
aaa:7> SensorReads(snsor_3,1573561949650,19.109322091177216)
aaa:8> SensorReads(snsor_4,1573561950163,43.48043890977487)
?
Process finished with exit code -1
这样一来事件时间就都不一样了。 可根据需要模拟的情况改动....
原文:https://www.cnblogs.com/senwren/p/fake-snsr-Rd-src.html