Search for "Distributed Cache" in this page of the Pig docs: http://pig.apache.org/docs/r0.11.0/udf.html
The example it shows using the getCacheFiles() method should ensure that the file is accessible to all the nodes in the cluster.
最近几天,在研究怎么样把日志中的IP地址转化成具体省份城市。
希望写一个pig udf
IP数据库采用的纯真IP数据库文件qqwry.dat
这里关键点在于怎么样读取这个文件,浪费了二天时间,现在把代码记录下来供和我遇到相同问题的朋友参考。
pig script
register /usr/local/pig/mypigudf.jar; define ip2address my.pig.func.IP2Address(‘/user/anny/qqwry.dat‘); a = load ‘/user/anny/hdfs/logtestdata/ipdata.log‘ as (ip:chararray); b = foreach a generate ip,ip2address(ip) as cc:map[chararray]; c = foreach b generate ip,cc#‘province‘ as province,cc#‘city‘ as city,cc#‘region‘ as region; dump c;
java写的pig udf:
package my.pig.func; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import my.pig.func.IPConvertCity.IPSeeker; import my.pig.func.IPConvertCity.IPUtil; import my.pig.func.IPConvertCity.LogFactory; import org.apache.log4j.Level; import org.apache.pig.EvalFunc; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; public class IP2Address extends EvalFunc<Map<String, Object>> { private String lookupFile = ""; private RandomAccessFile objFile = null; public IP2Address(String file) { this.lookupFile = file; } @Override public Map<String, Object> exec(Tuple input) throws IOException { if (input == null || input.size() == 0 || input.get(0) == null) return null; Map<String, Object> output = new HashMap<String, Object>(); String str = (String) input.get(0); try { if (str.length() == 0) return output; if (objFile == null) { try { objFile = new RandomAccessFile("./qqwry.dat", "r"); } catch (FileNotFoundException e1) { System.out.println("IP地址信息文件没有找到" + lookupFile); return null; } } IPSeeker seeker = new IPSeeker(objFile); String country = seeker.getCountry(str); output = IPUtil.splitCountry(country); return output; } catch (Exception e) { return output; } } @Override public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(null, DataType.MAP)); } public List<String> getCacheFiles() { List<String> list = new ArrayList<String>(1); list.add(lookupFile + "#qqwry.dat"); return list; } }
Search for "Distributed Cache" in this page of the Pig docs: http://pig.apache.org/docs/r0.11.0/udf.html
The example it shows using the getCacheFiles() method should ensure that the file is accessible to all the nodes in the cluster.
参考文章:http://stackoverflow.com/questions/17514022/access-hdfs-file-from-udf
http://stackoverflow.com/questions/19149839/pig-udf-maxmind-geoip-database-data-file-loading-issue
pig 自定义udf中读取hdfs 文件,布布扣,bubuko.com
原文:http://www.cnblogs.com/anny-1980/p/3673419.html