pipe(command, [envVars])
对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD
1
|
|
有同学问我,怎么用Spark来调用外部程序,我想到了pipe可以做这个事情。文章封面图就是PySpark的实现方案,其中就用到了pipe这个机制。
同学的需求和问题如下:
我灵机一动 想到了Spark Pipe 应该可以完成。查了一下资料,归纳和整理如下:
总的来说就是Spark有一个pipe的编程接口,用的是Unix的标准输入和输出,类似于 Unix的 |
管道,例如: ls | grep ^d
这一个步骤主要是罗列输入的任务,即,包含哪些文件。
// 此处文件的List可以从另一个HDFS上的文件读取过来
val data = List("hdfs://xxx/xxx/xxx/1.txt","hdfs://xxx/xxx/xxx/2.txt",...)
val dataRDD = sc.makeRDD(data) //sc 是你的 SparkContext
我们已经有了RDD了,那么接下来写一个启动launch.sh
脚本来启动我们的分析程序
#!/bin/sh
echo "Running launch.sh shell script..."
while read LINE; do
echo "启动分析任务, 待分析文件路径为: ${LINE}"
bash hdfs://xxx/xxx/xx/analysis_program.sh ${LINE}
done
下面的步骤就是整合步骤了
val scriptPath = "hdfs://xxx/xxx/launch.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect()
总结一下,
dataRDD
里面包含了我们要分析的文件列表,这个列表会被分发到spark集群,launch.sh
脚本,接受文件列表作为输入参数,launch.sh
脚本的循环体用这些文件列表启动具体的分析任务这样之后的好处是:
analysis_program.sh
不需要任何修改,做到了重用,这是最大的好处
附:如何用ansible 搭建一个standalone的spark集群 https://github.com/lresende/ansible-spark-cluster#deploying-spark-standalone
原文:https://www.cnblogs.com/bonelee/p/13089297.html