pig自带的pigstorage不能指定行分隔符,所以自己重写了一个简单的UDF类,可以指定列和行的分隔符,之前研究过的简单的,
http://blog.csdn.net/ruishenh/article/details/12048067
但是弊端大,所以这次重写一下。
操作步骤打好包上传到服务器,
grunt> register /home/pig/pig-0.11.0/udflib/myStorage.jar
grunt> cat student; 1,xiaohouzi,25/2,xiaohouzi2,24/3,xiaohouzi3,23
grunt> a = load ‘student‘ using com.hcr.hadoop.pig.MyStorage(‘,‘,‘/‘);
grunt> dump a;
(1,xiaohouzi,25) (2,xiaohouzi2,24) (3,xiaohouzi3,23)
grunt> store a into ‘myStorageOut‘ using com.hcr.hadoop.pig.MyStorage(‘,‘,‘/‘);
执行提示成功后查看
grunt> cat myStorageOut 1,xiaohouzi,25/2,xiaohouzi2,24/3,xiaohouzi3,23/
源码类
1 package com.hcr.hadoop.pig; 2 3 import java.io.DataOutputStream; 4 import java.io.IOException; 5 import java.io.UnsupportedEncodingException; 6 import java.util.ArrayList; 7 import java.util.List; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.fs.FSDataOutputStream; 13 import org.apache.hadoop.fs.FileSystem; 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.LongWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.io.WritableComparable; 18 import org.apache.hadoop.io.compress.BZip2Codec; 19 import org.apache.hadoop.io.compress.CompressionCodec; 20 import org.apache.hadoop.io.compress.GzipCodec; 21 import org.apache.hadoop.mapreduce.InputFormat; 22 import org.apache.hadoop.mapreduce.InputSplit; 23 import org.apache.hadoop.mapreduce.Job; 24 import org.apache.hadoop.mapreduce.OutputFormat; 25 import org.apache.hadoop.mapreduce.RecordReader; 26 import org.apache.hadoop.mapreduce.RecordWriter; 27 import org.apache.hadoop.mapreduce.TaskAttemptContext; 28 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 29 import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; 30 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 32 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 33 import org.apache.hadoop.util.ReflectionUtils; 34 import org.apache.pig.Expression; 35 import org.apache.pig.LoadFunc; 36 import org.apache.pig.LoadMetadata; 37 import org.apache.pig.PigException; 38 import org.apache.pig.ResourceSchema; 39 import org.apache.pig.ResourceSchema.ResourceFieldSchema; 40 import org.apache.pig.ResourceStatistics; 41 import org.apache.pig.StoreFunc; 42 import org.apache.pig.StoreFuncInterface; 43 import org.apache.pig.backend.executionengine.ExecException; 44 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; 45 import org.apache.pig.data.DataType; 46 import org.apache.pig.data.Tuple; 47 import org.apache.pig.data.TupleFactory; 48 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; 49 import org.apache.pig.impl.util.StorageUtil; 50 51 public class MyStorage extends LoadFunc implements StoreFuncInterface,LoadMetadata { 52 53 private static final Log LOG = LogFactory.getLog(MyStorage.class); 54 55 private static final String utf8 = "UTF-8"; 56 57 private static String fieldDel = "\t"; 58 59 private static String recordDel = "\n"; 60 61 protected RecordReader recordReader = null; 62 63 protected RecordWriter writer = null; 64 65 public MyStorage() { 66 } 67 68 public MyStorage(String fieldDel) { 69 this(fieldDel, "\n"); 70 } 71 72 public MyStorage(String fieldDel, String recordDel) { 73 this.fieldDel = fieldDel; 74 this.recordDel = recordDel; 75 } 76 77 @Override 78 public void setLocation(String s, Job job) throws IOException { 79 FileInputFormat.setInputPaths(job, s); 80 } 81 82 @Override 83 public InputFormat getInputFormat() throws IOException { 84 return new MyStorageInputFormat(recordDel); 85 } 86 87 @Override 88 public void prepareToRead(RecordReader recordReader, PigSplit pigSplit) 89 throws IOException { 90 this.recordReader = recordReader; 91 } 92 93 @Override 94 public Tuple getNext() throws IOException { 95 try { 96 boolean flag = recordReader.nextKeyValue(); 97 if (!flag) { 98 return null; 99 } 100 Text value = (Text) recordReader.getCurrentValue(); 101 String[] strArray = value.toString().split(fieldDel); 102 List lst = new ArrayList<String>(); 103 int i = 0; 104 for (String singleItem : strArray) { 105 lst.add(i++, singleItem); 106 } 107 return TupleFactory.getInstance().newTuple(lst); 108 } catch (InterruptedException e) { 109 throw new ExecException("Read data error", 110 PigException.REMOTE_ENVIRONMENT, e); 111 } 112 } 113 114 /** 115 * */ 116 @Override 117 public String relToAbsPathForStoreLocation(String location, Path curDir) 118 throws IOException { 119 return LoadFunc.getAbsolutePath(location, curDir); 120 } 121 122 @Override 123 public OutputFormat getOutputFormat() throws IOException { 124 return new MyStorageOutputFormat(StorageUtil.parseFieldDel(fieldDel), 125 this.recordDel); 126 } 127 128 @Override 129 public void setStoreLocation(String location, Job job) throws IOException { 130 job.getConfiguration().set("mapred.textoutputformat.separator", ""); 131 FileOutputFormat.setOutputPath(job, new Path(location)); 132 if ("true".equals(job.getConfiguration().get( 133 "output.compression.enabled"))) { 134 FileOutputFormat.setCompressOutput(job, true); 135 String codec = job.getConfiguration().get( 136 "output.compression.codec"); 137 try { 138 FileOutputFormat.setOutputCompressorClass(job, 139 (Class<? extends CompressionCodec>) Class 140 .forName(codec)); 141 } catch (ClassNotFoundException e) { 142 throw new RuntimeException("Class not found: " + codec); 143 } 144 } else { 145 // This makes it so that storing to a directory ending with ".gz" or 146 // ".bz2" works. 147 setCompression(new Path(location), job); 148 } 149 150 } 151 152 private void setCompression(Path path, Job job) { 153 String location = path.getName(); 154 if (location.endsWith(".bz2") || location.endsWith(".bz")) { 155 FileOutputFormat.setCompressOutput(job, true); 156 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 157 } else if (location.endsWith(".gz")) { 158 FileOutputFormat.setCompressOutput(job, true); 159 FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 160 } else { 161 FileOutputFormat.setCompressOutput(job, false); 162 } 163 } 164 165 @Override 166 public void checkSchema(ResourceSchema s) throws IOException { 167 // TODO Auto-generated method stub 168 169 } 170 171 @Override 172 public void prepareToWrite(RecordWriter writer) throws IOException { 173 this.writer = writer; 174 } 175 176 @Override 177 public void putNext(Tuple t) throws IOException { 178 try { 179 writer.write(null, t); 180 } catch (InterruptedException e) { 181 throw new IOException(e); 182 } 183 } 184 185 @Override 186 public void setStoreFuncUDFContextSignature(String signature) { 187 // TODO Auto-generated method stub 188 189 } 190 191 @Override 192 public void cleanupOnFailure(String location, Job job) throws IOException { 193 StoreFunc.cleanupOnFailureImpl(location, job); 194 } 195 196 @Override 197 public void cleanupOnSuccess(String location, Job job) throws IOException { 198 // TODO Auto-generated method stub 199 200 } 201 202 @Override 203 public ResourceSchema getSchema(String location, Job job) 204 throws IOException { 205 ResourceSchema rs=new ResourceSchema(); 206 FieldSchema c1 = new FieldSchema("c1", DataType.INTEGER); 207 FieldSchema c2 = new FieldSchema("c2", DataType.INTEGER); 208 FieldSchema c3 = new FieldSchema("c3", DataType.DOUBLE); 209 ResourceFieldSchema fs1 =new ResourceFieldSchema(c1); 210 ResourceFieldSchema fs2 =new ResourceFieldSchema(c2); 211 ResourceFieldSchema fs3 =new ResourceFieldSchema(c3); 212 rs.setFields(new ResourceFieldSchema[]{fs1,fs2,fs3}); 213 return rs; 214 } 215 216 @Override 217 public ResourceStatistics getStatistics(String location, Job job) 218 throws IOException { 219 // TODO Auto-generated method stub 220 return null; 221 } 222 223 @Override 224 public String[] getPartitionKeys(String location, Job job) 225 throws IOException { 226 // TODO Auto-generated method stub 227 return null; 228 } 229 230 @Override 231 public void setPartitionFilter(Expression partitionFilter) 232 throws IOException { 233 // TODO Auto-generated method stub 234 235 } 236 } 237 238 class MyStorageInputFormat extends TextInputFormat { 239 240 private final String recordDel; 241 242 public MyStorageInputFormat(String recordDel) { 243 this.recordDel = recordDel; 244 } 245 246 @Override 247 public RecordReader<LongWritable, Text> createRecordReader( 248 InputSplit split, TaskAttemptContext context) { 249 String delimiter = context.getConfiguration().get( 250 "textinputformat.record.delimiter"); 251 if (recordDel != null) { 252 delimiter = recordDel; 253 } 254 byte[] recordDelimiterBytes = null; 255 if (null != delimiter){ 256 try { 257 recordDelimiterBytes = decode(delimiter).getBytes("UTF-8"); 258 } catch (UnsupportedEncodingException e) { 259 // TODO Auto-generated catch block 260 e.printStackTrace(); 261 } catch (IOException e) { 262 // TODO Auto-generated catch block 263 e.printStackTrace(); 264 } 265 } 266 return new LineRecordReader(recordDelimiterBytes); 267 } 268 /** 269 * 工作流传过来的列分隔符,有可能是特殊字符,用八进制或者十六进制表示 270 * @throws IOException 271 */ 272 public static String decode(String str) throws IOException { 273 String re = str; 274 if (str != null && str.startsWith("\\")) { 275 str = str.substring(1, str.length()); 276 String[] chars = str.split("\\\\"); 277 byte[] bytes = new byte[chars.length]; 278 for (int i = 0; i < chars.length; i++) { 279 if (chars[i].equals("t")) { 280 bytes[i] = 9; 281 } else if (chars[i].equals("r")) { 282 bytes[i] = 13; 283 } else if (chars[i].equals("n")) { 284 bytes[i] = 10; 285 } else if (chars[i].equals("b")) { 286 bytes[i] = 8; 287 } else { 288 bytes[i] = Byte.decode(chars[i]); 289 } 290 } 291 try { 292 re = new String(bytes, "UTF-8"); 293 } catch (UnsupportedEncodingException e) { 294 throw new IOException(str, e); 295 } 296 } 297 return re; 298 } 299 300 } 301 302 class MyStorageOutputFormat extends TextOutputFormat<WritableComparable, Tuple> { 303 304 private final byte fieldDel; 305 306 private final String recordDel; 307 308 public MyStorageOutputFormat(byte delimiter) { 309 this(delimiter, "\n"); 310 } 311 312 public MyStorageOutputFormat(byte delimiter, String recordDel) { 313 this.fieldDel = delimiter; 314 this.recordDel = recordDel; 315 } 316 317 protected static class MyRecordWriter extends 318 TextOutputFormat.LineRecordWriter<WritableComparable, Tuple> { 319 320 private static byte[] newline; 321 322 private final byte fieldDel; 323 324 public MyRecordWriter(DataOutputStream out, byte fieldDel) 325 throws UnsupportedEncodingException { 326 this(out, fieldDel, "\n".getBytes("UTF-8")); 327 } 328 329 public MyRecordWriter(DataOutputStream out, byte fieldDel, byte[] record) { 330 super(out); 331 this.fieldDel = fieldDel; 332 this.newline = record; 333 } 334 335 public synchronized void write(WritableComparable key, Tuple value) 336 throws IOException { 337 int sz = value.size(); 338 for (int i = 0; i < sz; i++) { 339 StorageUtil.putField(out, value.get(i)); 340 if (i != sz - 1) { 341 out.writeByte(fieldDel); 342 } 343 } 344 out.write(newline); 345 } 346 } 347 348 @Override 349 public RecordWriter<WritableComparable, Tuple> getRecordWriter( 350 TaskAttemptContext job) throws IOException, InterruptedException { 351 Configuration conf = job.getConfiguration(); 352 boolean isCompressed = getCompressOutput(job); 353 CompressionCodec codec = null; 354 String extension = ""; 355 if (isCompressed) { 356 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass( 357 job, GzipCodec.class); 358 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, 359 conf); 360 extension = codec.getDefaultExtension(); 361 } 362 Path file = getDefaultWorkFile(job, extension); 363 FileSystem fs = file.getFileSystem(conf); 364 if (!isCompressed) { 365 FSDataOutputStream fileOut = fs.create(file, false); 366 return new MyRecordWriter(fileOut, fieldDel, 367 this.recordDel.getBytes()); 368 } else { 369 FSDataOutputStream fileOut = fs.create(file, false); 370 return new MyRecordWriter(new DataOutputStream( 371 codec.createOutputStream(fileOut)), fieldDel, 372 this.recordDel.getBytes()); 373 } 374 } 375 376 }
grunt> register /home/pig/pig-0.11.0/udflib/myStorage.jar grunt> cat X; keyDataKNZKCZY:ZDKJS:616150:AFS:3842708d_20131219194420-642464756keyDataKNZKCZY:ZDKJS:616614:AFS:3843920d_20131219194420-642464756keyDataKNZKCZY:ZDKJS:616661:AFS:3844040d_20131219194420-642464756 grunt> a = load ‘X‘ using com.hcr.hadoop.pig.MyStorage(‘\\001‘,‘\\002‘); grunt> dump a; (keyData,KNZKCZY:ZDKJS:616150:AFS:3842708,d_20131219194420-642464756) (keyData,KNZKCZY:ZDKJS:616614:AFS:3843920,d_20131219194420-642464756) (keyData,KNZKCZY:ZDKJS:616661:AFS:3844040,d_20131219194420-642464756) grunt>
有的时候如果加载模式不想指定具体模式(比如太多了字段,或者不够公有化)就想使用已存在的模式
实现LoadMetadata接口,然后
重写
@Override
public ResourceSchema getSchema(String location, Job job)throws IOException {
ResourceSchema rs=new ResourceSchema();
FieldSchema c1 = new FieldSchema("c1", DataType.INTEGER);
FieldSchema c2 = new FieldSchema("c2", DataType.INTEGER);
FieldSchema c3 = new FieldSchema("c3", DataType.DOUBLE);
ResourceFieldSchema fs1 =new ResourceFieldSchema(c1);
ResourceFieldSchema fs2 =new ResourceFieldSchema(c2);
ResourceFieldSchema fs3 =new ResourceFieldSchema(c3);
rs.setFields(new ResourceFieldSchema[]{fs1,fs2,fs3});
return rs;
}
这一个简单的例子中就返回了直接使用模式的形式
grunt> register /home/pig/pig-0.11.0/udflib/myStorage.jar
grunt> a = load ‘student‘ using com.hcr.hadoop.pig.MyStorage(‘,‘,‘/‘);
grunt> describe a; a: {c1: int,c2: int,c3: double}
grunt> b = foreach a generate c1,c2,c3;
grunt> describe b;
b: {c1: int,c2: int,c3: double}
摘录地址:http://blog.csdn.net/ruishenh/article/details/12192391
(转)Pig 重写加载函数和存储函数UDF,布布扣,bubuko.com
原文:http://www.cnblogs.com/anny-1980/p/3673569.html