简单UDTF的实现
实现基于切割字符串并生成多行数据。
package com.alipay.mbaprod.spark.udtf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class SimpleUDTF extends GenericUDTF {
private PrimitiveObjectInspector stringOI = null;
@Override
public void configure(MapredContext context) {
super.configure(context);
Configuration conf = context.getJobConf();
}
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentException("SimpleUDTF() should have 1 arguments");
}
for (int i = 0; i < args.length; ++i) {
if (args[i].getCategory() != ObjectInspector.Category.PRIMITIVE ||
!args[i].getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
throw new UDFArgumentException("SimpleUDTF()'s arguments have to be string type");
}
}
stringOI = (PrimitiveObjectInspector) args[0];
List<String> fieldNames = new ArrayList<String>(2);
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
fieldNames.add("col1");
fieldNames.add("col2");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@VisibleForTesting
public List<Object[]> processInputRecord(String id){
ArrayList<Object[]> result = new ArrayList<Object[]>();
// ignoring null or empty input
if (id == null || id.isEmpty()) {
return result;
}
String[] lines = id.split("\n");
for (String line : lines) {
String[] tokens = line.trim().split("\\s+");
if (tokens.length == 2){
result.add(new Object[] { tokens[0], tokens[1]});
} else if (tokens.length == 3){
result.add(new Object[] { tokens[0], tokens[1]});
result.add(new Object[] { tokens[0], tokens[2]});
}
}
return result;
}
@Override
public void process(Object[] args) throws HiveException {
final String id = stringOI.getPrimitiveJavaObject(args[0]).toString();
List<Object[]> results = processInputRecord(id);
Iterator<Object[]> it = results.iterator();
while (it.hasNext()) {
Object[] r = it.next();
forward(r);
}
}
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
}