首页 > 其他 > 详细

Flink扫描重复数据的分布式文件缓存

时间:2021-07-28 18:15:37      阅读:28      评论:0      收藏:0      [点我收藏+]

Flink提供了一个分布式缓存,类似于Apache Hadoop,可以在本地访问用户函数的并行实例。此函数可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。
缓存的工作原理如下。程序在其作为缓存文件的特定名称下注册本地或远程文件系统(如HDFS或S3)的文件或目录ExecutionEnvironment。执行程序时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。
其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件.
分布式缓存使用如下:
注册中的文件或目录ExecutionEnvironment。

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

访问用户函数中的缓存文件或目录(此处为MapFunction)。该函数必须扩展RichFunction类,因为它需要访问RuntimeContext。


// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // access cached file via RuntimeContext and DistributedCache
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // read the file (or navigate the directory)
    ...
  }

  override def map(value: String): Int = {
    // use content of cached file
    ...
  }
}

下面给出一个完整的demo,计算存在于缓存文件中的单词出现的次数,看下面的代码


package flink.cache

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source
import org.apache.flink.api.scala._

object FlinkCacheDemo {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(3)
    // 注册缓存的文件,里面有数据hello jason
    env.registerCachedFile("D:/test.txt", "testfile")
    val stream = env.fromElements("hello", "jason", "hello", "jim")
    val result = stream
      .flatMap(_.split(","))
      .map(new RichMapFunction[String, String] {
        var list: List[(String)] = _
        override def open(parameters: Configuration): Unit = {
          super.open(parameters)
          // 获取缓存的数据
          val file = getRuntimeContext.getDistributedCache.getFile("testfile")
          val lines = Source.fromFile(file.getAbsoluteFile).getLines()
          list = lines.toList
        }
        override def map(value: String): String = {
          var middle: String = ""
          if(list(0).contains(value)) {
            middle = value
          }
          middle
        }
      })
      .map((_,1L))
      .filter(_._1.nonEmpty)
      .groupBy(0)
      .sum(1)
      .print()
  }
}

运行代码输出的结果是,因为jim不在缓存的文件中,被过滤掉了
(hello,2)
(jason,1)

 

Flink扫描重复数据的分布式文件缓存

原文:https://www.cnblogs.com/luobing2211/p/15070387.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!