本文通过跟代码的方式,分析从输入一批Pig-latin到输出物理执行计划(与launcher引擎有关,一般是MR执行计划,也可以是Spark RDD的执行算子)的整体流程。
不会具体涉及AST如何解析、如何使用了Anltr、逻辑执行计划如何映射、逻辑执行计划如何优化、MR执行计划如何切分为MR Job,而是从输入一批Pig DSL到待执行的真正执行计划的关键变化步骤(方法和类)。
入口处书Main类的main函数
/** * The Main-Class for the Pig Jar that will provide a shell and setup a classpath appropriate * for executing Jar files. Warning, this method calls System.exit(). * * @param args * -jar can be used to add additional jar files (colon separated). - will start a * shell. -e will execute the rest of the command line as if it was input to the * shell. * @throws IOException */ public static void main(String args[]) { // add win HADOOP_HOME // make sure you have "winutils.exe" under /bin // if not, download one from https://github.com/srccodes/hadoop-common-2.2.0-bin/tree/master/bin // entrance for local debug // better try: -x spark -e cmds System.exit(run(args, null)); }
Main -> GruntParser,这是第一步。
Main首先进行一些参数初始化(启动模式、输入类型判断、初始化类等等),然后借助GruntParser解析输入的pig-latin脚本
grunt.exec()之后的下一个关键步骤是进入GruntParser的解析。
GruntParser里parse()会依赖PigScriptParser.jj文件,具体代码跟不进去,最终生成的是语法树。
在出现Dump操作之后,进入GruntParser的processDump方法。
这步结束之后是完成了语法层面的解析。
PigServer-> QueryParserDriver,PigServer的parseQuery方法会进入QueryParserDriver的parse(query)方法,返回逻辑执行计划
QueryParserDriver->LogicalPlanGenerator,生成逻辑执行计划具体依靠的是LogicalPlanGenerator。
截止到这,是生成了逻辑执行计划。
PigServer-> HExecutionEngine,接下来是优化逻辑执行计划和生成物理执行计划。
接下来HExecutionEngine.compile(LogicalPlan, Properties)先优化逻辑执行计划。
HExecutionEngine在初始化的时候,会针对不同的情况组合不同的优化策略(disable某些规则)。
这个PlanOptimizer优化的过程在之前 逻辑执行计划优化 的文档里已经有了具体过程了。
再生成物理执行计划,
主要通过LogToPhyTranslcationVisitor内在walk遍历逻辑执行计划节点的时候,
针对不同的Op accept()时触发对应LogToPhyTranslcationVisitor的多态visit(Op)方法,实现逻辑执行计划步骤同物理执行计划步骤的映射。
接下来就是launchPlan,根据配置启动Launcher,如下是启动了SparkLauncher
SparkLauncher里对物理执行计划的每个步骤进行了RDD操作的翻译(直接对应算子),执行后以SparkStats返回,内含OutputInfo信息(包括结果文件地址等信息)。
关于Pig on Spark如何实现SparkLauncher和翻译物理执行计划算子,可以参考我的Github和这篇博文来阅读代码。
另一条路是启动MapReduceLauncher,MRCompiler把物理执行计划翻译成MR执行计划。主要的翻译过程在compile(PO)方法里。
MR执行计划的翻译主要有两步,
首先是MRCompiler.compile(),把物理执行计划翻译到MapReduce执行计划
其次JobControlCompiler.compile(),输入MROperPlan,返回JobControl,这步控制MR Job队列
以上是从输入脚本到输出执行计划的整体流程。比较粗糙,但关键步骤和过程都有。
为了直观起见,我把自己跑的例子贴出来,包括pig-latin,逻辑执行计划,物理执行计划,MR执行计划。
pig-latin
REGISTER D:/tutorial.jar; raw = LOAD ‘D:/github/flare-spork/tutorial/data/excite-small.log‘ USING PigStorage(‘\t‘) AS (user, time, query); clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query); clean2 = FOREACH clean1 GENERATE user, time, org.apache.pig.tutorial.ToLower(query) as query; houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query; ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram; ngramed2 = DISTINCT ngramed1; hour_frequency1 = GROUP ngramed2 BY (ngram, hour); hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count; uniq_frequency1 = GROUP hour_frequency2 BY group::ngram; uniq_frequency2 = FOREACH uniq_frequency1 GENERATE flatten($0), flatten(org.apache.pig.tutorial.ScoreGenerator($1)); uniq_frequency3 = FOREACH uniq_frequency2 GENERATE $1 as hour, $0 as ngram, $2 as score, $3 as count, $4 as mean; filtered_uniq_frequency = FILTER uniq_frequency3 BY score > 2.0; ordered_uniq_frequency = ORDER filtered_uniq_frequency BY hour, score; dump ordered_uniq_frequency;
#----------------------------------------------- # New Logical Plan: #----------------------------------------------- ordered_uniq_frequency: (Name: LOSort Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double) | | | hour:(Name: Project Type: chararray Uid: 22 Input: 0 Column: hour) | | | score:(Name: Project Type: double Uid: 23 Input: 0 Column: score) | |---filtered_uniq_frequency: (Name: LOFilter Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double) | | | (Name: GreaterThan Type: boolean Uid: 29) | | | |---score:(Name: Project Type: double Uid: 23 Input: 0 Column: score) | | | |---(Name: Constant Type: double Uid: 28) | |---uniq_frequency3: (Name: LOForEach Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double) | | | (Name: LOGenerate[false,false,false,false,false] Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double) | | | | | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour:(Name: Project Type: chararray Uid: 22 Input: 0 Column: (*)) | | | | | group:(Name: Project Type: chararray Uid: 9 Input: 1 Column: (*)) | | | | | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score:(Name: Project Type: double Uid: 23 Input: 2 Column: (*)) | | | | | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count:(Name: Project Type: long Uid: 24 Input: 3 Column: (*)) | | | | | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean:(Name: Project Type: double Uid: 25 Input: 4 Column: (*)) | | | |---(Name: LOInnerLoad[1] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray) | | | |---(Name: LOInnerLoad[0] Schema: group#9:chararray) | | | |---(Name: LOInnerLoad[2] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double) | | | |---(Name: LOInnerLoad[3] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long) | | | |---(Name: LOInnerLoad[4] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double) | |---uniq_frequency2: (Name: LOForEach Schema: group#9:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double) | | | (Name: LOGenerate[true,true] Schema: group#9:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double) | | | | | group:(Name: Project Type: chararray Uid: 9 Input: 0 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.tutorial.ScoreGenerator) Type: bag Uid: 20) | | | | | |---hour_frequency2:(Name: Project Type: bag Uid: 16 Input: 1 Column: (*)) | | | |---(Name: LOInnerLoad[0] Schema: group#9:chararray) | | | |---hour_frequency2: (Name: LOInnerLoad[1] Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long) | |---uniq_frequency1: (Name: LOCogroup Schema: group#9:chararray,hour_frequency2#16:bag{#31:tuple(group::ngram#9:chararray,group::hour#6:chararray,count#14:long)}) | | | group::ngram:(Name: Project Type: chararray Uid: 9 Input: 0 Column: group::ngram) | |---hour_frequency2: (Name: LOForEach Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long) | | | (Name: LOGenerate[true,false] Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long) | | | | | group:(Name: Project Type: tuple Uid: 10 Input: 0 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid: 14) | | | | | |---ngramed2:(Name: Project Type: bag Uid: 11 Input: 1 Column: (*)) | | | |---(Name: LOInnerLoad[0] Schema: group#10:tuple(ngram#9:chararray,hour#6:chararray)) | | | |---ngramed2: (Name: LOInnerLoad[1] Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray) | |---hour_frequency1: (Name: LOCogroup Schema: group#10:tuple(ngram#9:chararray,hour#6:chararray),ngramed2#11:bag{#30:tuple(user#1:bytearray,hour#6:chararray,ngram#9:chararray)}) | | | ngram:(Name: Project Type: chararray Uid: 9 Input: 0 Column: ngram) | | | hour:(Name: Project Type: chararray Uid: 6 Input: 0 Column: hour) | |---ngramed2: (Name: LODistinct Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray) | |---ngramed1: (Name: LOForEach Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray) | | | (Name: LOGenerate[false,false,true] Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray) | | | | | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*)) | | | | | hour:(Name: Project Type: chararray Uid: 6 Input: 1 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.tutorial.NGramGenerator) Type: bag Uid: 7) | | | | | |---query:(Name: Project Type: chararray Uid: 5 Input: 2 Column: (*)) | | | |---(Name: LOInnerLoad[user] Schema: user#1:bytearray) | | | |---(Name: LOInnerLoad[hour] Schema: hour#6:chararray) | | | |---(Name: LOInnerLoad[query] Schema: query#5:chararray) | |---houred: (Name: LOForEach Schema: user#1:bytearray,hour#6:chararray,query#5:chararray) | | | (Name: LOGenerate[false,false,false] Schema: user#1:bytearray,hour#6:chararray,query#5:chararray) | | | | | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.tutorial.ExtractHour) Type: chararray Uid: 6) | | | | | |---time:(Name: Project Type: bytearray Uid: 2 Input: 1 Column: (*)) | | | | | query:(Name: Project Type: chararray Uid: 5 Input: 2 Column: (*)) | | | |---(Name: LOInnerLoad[user] Schema: user#1:bytearray) | | | |---(Name: LOInnerLoad[time] Schema: time#2:bytearray) | | | |---(Name: LOInnerLoad[query] Schema: query#5:chararray) | |---clean2: (Name: LOForEach Schema: user#1:bytearray,time#2:bytearray,query#5:chararray) | | | (Name: LOGenerate[false,false,false] Schema: user#1:bytearray,time#2:bytearray,query#5:chararray) | | | | | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*)) | | | | | time:(Name: Project Type: bytearray Uid: 2 Input: 1 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.tutorial.ToLower) Type: chararray Uid: 5) | | | | | |---query:(Name: Project Type: bytearray Uid: 3 Input: 2 Column: (*)) | | | |---(Name: LOInnerLoad[user] Schema: user#1:bytearray) | | | |---(Name: LOInnerLoad[time] Schema: time#2:bytearray) | | | |---(Name: LOInnerLoad[query] Schema: query#3:bytearray) | |---clean1: (Name: LOFilter Schema: user#1:bytearray,time#2:bytearray,query#3:bytearray) | | ...
物理执行计划
| |---ordered_uniq_frequency: POSort[bag]() - scope-70 | | | Project[chararray][0] - scope-68 | | | Project[double][2] - scope-69 | |---uniq_frequency3: New For Each(false,false,false,false,false)[bag] - scope-67 | | | Project[chararray][1] - scope-57 | | | Project[chararray][0] - scope-59 | | | Project[double][2] - scope-61 | | | Project[long][3] - scope-63 | | | Project[double][4] - scope-65 | |---filtered_uniq_frequency: Filter[bag] - scope-53 | | | Greater Than[boolean] - scope-56 | | | |---Project[double][2] - scope-54 | | | |---Constant(2.0) - scope-55 | |---uniq_frequency2: New For Each(true,true)[bag] - scope-52 | | | Project[chararray][0] - scope-47 | | | POUserFunc(org.apache.pig.tutorial.ScoreGenerator)[bag] - scope-50 | | | |---Project[bag][1] - scope-49 | |---uniq_frequency1: Package[tuple]{chararray} - scope-44 | |---uniq_frequency1: Global Rearrange[tuple] - scope-43 | |---uniq_frequency1: Local Rearrange[tuple]{chararray}(false) - scope-45 | | | Project[chararray][0] - scope-46 | |---hour_frequency2: New For Each(true,false)[bag] - scope-42 | | | Project[tuple][0] - scope-37 | | | POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-40 | | | |---Project[bag][1] - scope-39 | |---hour_frequency1: Package[tuple]{tuple} - scope-33 | |---hour_frequency1: Global Rearrange[tuple] - scope-32 | |---hour_frequency1: Local Rearrange[tuple]{tuple}(false) - scope-34 | | | Project[chararray][2] - scope-35 | | | Project[chararray][1] - scope-36 | |---ngramed2: PODistinct[bag] - scope-31 | |---ngramed1: New For Each(false,false,true)[bag] - scope-30 | | | Project[bytearray][0] - scope-23 | | | Project[chararray][1] - scope-25 | | | POUserFunc(org.apache.pig.tutorial.NGramGenerator)[bag] - scope-28 | | | |---Project[chararray][2] - scope-27 | |---houred: New For Each(false,false,false)[bag] - scope-22 | | | Project[bytearray][0] - scope-14 | | | POUserFunc(org.apache.pig.tutorial.ExtractHour)[chararray] - scope-18 | | | |---Cast[chararray] - scope-17 | | | |---Project[bytearray][1] - scope-16 | | | Project[chararray][2] - scope-20 | |---clean2: New For Each(false,false,false)[bag] - scope-13 | | | Project[bytearray][0] - scope-5 | | | Project[bytearray][1] - scope-7 | | | POUserFunc(org.apache.pig.tutorial.ToLower)[chararray] - scope-11 | | | |---Cast[chararray] - scope-10 | | | |---Project[bytearray][2] - scope-9 | |---clean1: Filter[bag] - scope-1 | | | POUserFunc(org.apache.pig.tutorial.NonURLDetector)[boolean] - scope-4 | | | |---Cast[chararray] - scope-3 | | | |---Project[bytearray][2] - scope-2 | |---raw: Load(D:/github/flare-spork/tutorial/data/excite-small.log:PigStorage(‘ ‘)) - scope-0
MR执行计划
# Map Reduce Plan #-------------------------------------------------- MapReduce node scope-72 Map Plan Local Rearrange[tuple]{tuple}(true) - scope-74 | | | Project[tuple][*] - scope-73 | |---ngramed1: New For Each(false,false,true)[bag] - scope-30 | | | Project[bytearray][0] - scope-23 | | | Project[chararray][1] - scope-25 | | | POUserFunc(org.apache.pig.tutorial.NGramGenerator)[bag] - scope-28 | | | |---Project[chararray][2] - scope-27 | |---houred: New For Each(false,false,false)[bag] - scope-22 | | | Project[bytearray][0] - scope-14 | | | POUserFunc(org.apache.pig.tutorial.ExtractHour)[chararray] - scope-18 | | | |---Cast[chararray] - scope-17 | | | |---Project[bytearray][1] - scope-16 | | | Project[chararray][2] - scope-20 | |---clean2: New For Each(false,false,false)[bag] - scope-13 | | | Project[bytearray][0] - scope-5 | | | Project[bytearray][1] - scope-7 | | | POUserFunc(org.apache.pig.tutorial.ToLower)[chararray] - scope-11 | | | |---Cast[chararray] - scope-10 | | | |---Project[bytearray][2] - scope-9 | |---clean1: Filter[bag] - scope-1 | | | POUserFunc(org.apache.pig.tutorial.NonURLDetector)[boolean] - scope-4 | | | |---Cast[chararray] - scope-3 | | | |---Project[bytearray][2] - scope-2 | |---raw: Load(D:/github/flare-spork/tutorial/data/excite-small.log:PigStorage(‘ ‘)) - scope-0-------- Reduce Plan Store(file:/tmp/temp1620254926/tmp-1456742965:org.apache.pig.impl.io.InterStorage) - scope-78 | |---New For Each(true)[bag] - scope-77 | | | Project[tuple][0] - scope-76 | |---Package[tuple]{tuple} - scope-75-------- Global sort: false ---------------- MapReduce node scope-80 Map Plan hour_frequency1: Local Rearrange[tuple]{tuple}(false) - scope-34 | | | Project[chararray][2] - scope-35 | | | Project[chararray][1] - scope-36 | |---Load(file:/tmp/temp1620254926/tmp-1456742965:org.apache.pig.impl.io.InterStorage) - scope-79-------- Reduce Plan Store(file:/tmp/temp1620254926/tmp2077335416:org.apache.pig.impl.io.InterStorage) - scope-81 | |---hour_frequency2: New For Each(true,false)[bag] - scope-42 | | | Project[tuple][0] - scope-37 | | | POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-40 | | | |---Project[bag][1] - scope-39 | |---hour_frequency1: Package[tuple]{tuple} - scope-33-------- Global sort: false ---------------- MapReduce node scope-83 Map Plan uniq_frequency1: Local Rearrange[tuple]{chararray}(false) - scope-45 | | | Project[chararray][0] - scope-46 | |---Load(file:/tmp/temp1620254926/tmp2077335416:org.apache.pig.impl.io.InterStorage) - scope-82-------- Reduce Plan Store(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.io.InterStorage) - scope-84 | |---uniq_frequency3: New For Each(false,false,false,false,false)[bag] - scope-67 | | | Project[chararray][1] - scope-57 | | | Project[chararray][0] - scope-59 | | | Project[double][2] - scope-61 | | | Project[long][3] - scope-63 | | | Project[double][4] - scope-65 | |---filtered_uniq_frequency: Filter[bag] - scope-53 | | | Greater Than[boolean] - scope-56 | | | |---Project[double][2] - scope-54 | | | |---Constant(2.0) - scope-55 | |---uniq_frequency2: New For Each(true,true)[bag] - scope-52 | | | Project[chararray][0] - scope-47 | | | POUserFunc(org.apache.pig.tutorial.ScoreGenerator)[bag] - scope-50 | | | |---Project[bag][1] - scope-49 | |---uniq_frequency1: Package[tuple]{chararray} - scope-44-------- Global sort: false ---------------- MapReduce node scope-86 Map Plan ordered_uniq_frequency: Local Rearrange[tuple]{chararray}(false) - scope-91 | | | Constant(all) - scope-90 | |---New For Each(false,false)[tuple] - scope-89 | | | Project[chararray][0] - scope-87 | | | Project[double][2] - scope-88 | |---Load(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.builtin.RandomSampleLoader(‘org.apache.pig.impl.io.InterStorage‘,‘100‘)) - scope-85-------- Reduce Plan Store(file:/tmp/temp1620254926/tmp-586682361:org.apache.pig.impl.io.InterStorage) - scope-101 | |---New For Each(false)[tuple] - scope-100 | | | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-99 | | | |---Project[tuple][*] - scope-98 | |---New For Each(false,false)[tuple] - scope-97 | | | Constant(-1) - scope-96 | | | ordered_uniq_frequency: POSort[bag]() - scope-70 | | | | | Project[chararray][0] - scope-94 | | | | | Project[double][1] - scope-95 | | | |---Project[bag][1] - scope-93 | |---Package[tuple]{chararray} - scope-92-------- Global sort: false ---------------- MapReduce node scope-103 Map Plan ordered_uniq_frequency: Local Rearrange[tuple]{tuple}(false) - scope-104 | | | Project[chararray][0] - scope-68 | | | Project[double][2] - scope-69 | |---Load(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.io.InterStorage) - scope-102-------- Reduce Plan ordered_uniq_frequency: Store(file:/tmp/temp1620254926/tmp-225116343:org.apache.pig.impl.io.InterStorage) - scope-71 | |---New For Each(true)[tuple] - scope-107 | | | Project[bag][1] - scope-106 | |---PackageLite[tuple]{tuple} - scope-105-------- Global sort: true Quantile file: file:/tmp/temp1620254926/tmp-586682361 ----------------
全文完 :)
Pig源码分析: 简析执行计划的生成,布布扣,bubuko.com
原文:http://blog.csdn.net/pelick/article/details/25242231