1:注册一个文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
2:访问数据
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
import org.apache.commons.io.FileUtils import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration object BatchDemoDisCacheScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //1:注册文件 env.registerCachedFile("d:\\data\\file\\a.txt","b.txt") val data = env.fromElements("a","b","c","d") val result = data.map(new RichMapFunction[String,String] { override def open(parameters: Configuration): Unit = { super.open(parameters) val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt") val lines = FileUtils.readLines(myFile) val it = lines.iterator() while (it.hasNext){ val line = it.next(); println("line:"+line) } } override def map(value: String) = { value } }) result.print() } }
参考:
https://blog.csdn.net/shenshouniu/article/details/84499655
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/
原文:https://www.cnblogs.com/linkmust/p/10902050.html