本文是Pig系统分析系列中的最后一篇了,主要讨论如何扩展Pig功能,不仅介绍Pig本身提供的UDFs扩展机制,还从架构上探讨Pig扩展可能性。
补充说明:前些天同事发现twitter推动的Pig On Spark项目:Spork,准备研究下。
通过UDFs(用户自定义函数),可以自定义数据处理方法,扩展Pig功能。实际上,UDFS除了使用之前需要register/define外,和内置函数没什么不同。
以内置的ABS函数为例:
public class ABS extends EvalFunc<Double>{ /** * java level API * @param input expectsa single numeric value * @return output returns a single numeric value, absolute value of the argument */ public Double exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; Double d; try{ d = DataType.toDouble(input.get(0)); } catch (NumberFormatException nfe){ System.err.println("Failed to process input; error -" + nfe.getMessage()); return null; } catch (Exception e){ throw new IOException("Caught exception processing input row", e); } return Math.abs(d); } …… public Schema outputSchema(Schema input) ; public List<FuncSpec> getArgToFuncMapping() throws FrontendException; }
EvalFuc方法也能实现聚合函数,这是因为group操作对每个分组都返回一条记录,每组中包含一个Bag,所以exec方法中迭代处理Bag中记录即可。
public Long exec(Tuple input) throws IOException { try { DataBag bag = (DataBag)input.get(0); if(bag==null) return null; Iterator it = bag.iterator(); long cnt = 0; while (it.hasNext()){ Tuple t = (Tuple)it.next(); if (t != null && t.size() > 0 && t.get(0) != null ) cnt++; } return cnt; } catch (ExecException ee) { throw ee; } catch (Exception e) { int errCode = 2106; String msg = "Error while computing count in " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } }
如前所述,具备algebraic性质的聚合函数在Map-Reduce过程中能被Combiner优化。直观来理解,具备algebraic性质的函数处理过程能被分为三部分:initial(初始化,处理部分输入数据)、intermediate(中间过程,处理初始化过程的结果)和final(收尾,处理中间过程的结果)。比如COUNT函数,初始化过程为count计数操作,中间过程和收尾为sum求和操作。更进一步,如果函数在这三个阶段中都能进行相同的操作,那么函数具备distributive性质,比如SUM函数。
Pig提供了Algebraic 接口:
public interface Algebraic{ /** * Get the initial function. * @return A function name of f_init. f_init shouldbe an eval func. * The return type off_init.exec() has to be Tuple */ public String getInitial(); /** * Get the intermediatefunction. * @return A function name of f_intermed. f_intermedshould be an eval func. * The return type off_intermed.exec() has to be Tuple */ public String getIntermed(); /** * Get the final function. * @return A function name of f_final. f_final shouldbe an eval func parametrized by * the same datum as the evalfunc implementing this interface. */ public String getFinal(); }
input= load ‘data‘ as (x, y); grpd= group input by x; cnt= foreach grpd generate group, COUNT(input); storecnt into ‘result‘;Pig会重写MR执行计划:
Map load,foreach(group,COUNT.Initial) Combine foreach(group,COUNT.Intermediate) Reduce foreach(group,COUNT.Final),storeAlgebraic 接口通过Combiner优化减少数据传输量,而Accumulator接口则关注的是内存使用量。UDF实现Accumulator接口后,Pig保证所有key相同的数据(通过Shuffle)以增量的形式传递给UDF(默认pig.accumulative.batchsize=20000)。同样,COUNT也实现了Accumulator接口。
/* Accumulator interface implementation */ private long intermediateCount = 0L; @Override public void accumulate(Tuple b) throws IOException { try { DataBag bag = (DataBag)b.get(0); Iterator it = bag.iterator(); while (it.hasNext()){ Tuple t = (Tuple)it.next(); if (t != null && t.size() > 0 && t.get(0) != null) { intermediateCount += 1; } } } catch (ExecException ee) { throw ee; } catch (Exception e) { int errCode = 2106; String msg = "Error while computing min in " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } @Override public void cleanup() { intermediateCount = 0L; } @Override /* *当前key都被处理完之后被调用 */ public Long getValue() { return intermediateCount; }
通过UDFs构造函数传递数据是最简单的方法,然后通过define语句定义UDF实例时指定构造方法参数。但有些情况下,比如数据在运行期才产生,或者数据不能用String格式表达,这时候就得使用UDFContext了。UDF通过getUDFContext方法获取保存在ThreadLoacl中的UDFContext实例。UDFContext包含以下信息:
Pig哲学之三——Pigs Live Anywhere。理论上,Pig并不被限定运行在Hadoop框架上,有几个可以参考的实现和提议。
Pig官网:http://pig.apache.org/
Pig paper at SIGMOD 2008:Building a High Level DataflowSystem on top of MapReduce:The Pig Experience
Programming.Pig:Dataflow.Scripting.with.Hadoop(2011.9).Alan.Gates
Pig系统分析(8)-Pig可扩展性,布布扣,bubuko.com
原文:http://blog.csdn.net/idontwantobe/article/details/25045871