(可以通过idea工具调试UDF函数,第二步中会提供参考)
一、自定义UDF函数:
1、首先是pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cn.hive.mongo</groupId>
<artifactId>hive-mongodb</artifactId>
<version>1.0-SNAPSHOT</version>
<name>hive-mongodb</name>
<description>load data hive to mongodb</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.7</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.14.0</version>
<type>jar</type>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>calcite-core</artifactId>
<groupId>org.apache.calcite</groupId>
</exclusion>
<exclusion>
<artifactId>calcite-avatica</artifactId>
<groupId>org.apache.calcite</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.205.0</version>
<type>jar</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.cn.udf.MongoStorageHandler</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、关于mongo的Helper类
package com.cn.udf;
import com.mongodb.*;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import org.bson.Document;
/**
* Created by Administrator on 2017/1/13 11:30.
* Copyright: Copyright (c) 2016
* Description:
*/
public class MongodbHelper {
private static MongoClient mongoClient = null;
private static MongoDatabase database = null;
public MongodbHelper() {
}
public MongoDatabase init(String dbUri) {
String dbName = dbUri.substring(dbUri.lastIndexOf("/") + 1, dbUri.length());
if (database == null) {
MongoClientURI uri = new MongoClientURI(dbUri,
MongoClientOptions.builder().cursorFinalizerEnabled(false));
mongoClient = new MongoClient(uri);
database = mongoClient.getDatabase(dbName);
}
return database;
}
public MongoCollection<Document> getCollection(String dbCollection) {
MongoCollection<Document> collection;
collection = database.getCollection(dbCollection);
return collection;
}
public void close() {
mongoClient.close();
}
public void updateOneDocument(MongoCollection<Document> collection, String[] arguments) {
FindIterable<Document> documents = collection.find(Filters.eq("Id", arguments[0]));
Document document = documents.first();
if (null == document) {
Document docs = new Document().append("Id", arguments[0]).append("count", new Integer(arguments[1]));
collection.insertOne(docs);
} else {
Long last_time_hive = new Long(arguments[2]);
Long create_mongo = new Long(String.valueOf(document.get("startStamp")));
String phone = document.get("phone") == null ? "" : String.valueOf(document.get("phone"));
if (last_time_hive <= create_mongo) {
collection.updateMany(Filters.and(Filters.eq("Id", arguments[0])),
new Document("$inc", new Document("count", new Integer(arguments[1]) + 1))
.append("$set", new Document("phone", ("".equals(phone)) ? arguments[3] : phone)),
new UpdateOptions().upsert(true));
}
}
}
}
3、自定义函数MongoStorageHandler入口类
package com.cn.udf;
import com.mongodb.client.MongoCollection;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;
/**
* Created by Administrator on 2017/1/12 18:56.
* Copyright: Copyright (c) 2016
* Description: load hive data to mongodb
*/
@Description(name = "hive-to-mongodb",
value = "_FUNC_(url, collection, [arguments]) - sends data to a jdbc driver\n"
+ "-- url : mongodb://host:port/dbName or mongodb://username/password@host:port/dbName\n"
+ "-- collection : database collection\n"
+ "-- [arguments] order : customer_id, sign_times, sign_last_time, phone, sex, name\n"
+ "-- return 0 for success, -1 for failed, 2 for connection problem",
extended = "argument 0 is the database url string(mongodb://host:port/dbName or mongodb://username/password@host:port/dbName)\n"
+ "argument 1 is the database collection\n"
+ "argument (2~n) The remaining arguments must be primitive and are "
+ "passed to the PreparedStatement object\n"
+ "if return 2 stand for you have a problem with the Driver loading or connection\n")
@UDFType(deterministic = false)
public class MongoStorageHandler extends GenericUDF {
private static final Logger log = LoggerFactory.getLogger(MongoStorageHandler.class);
private transient ObjectInspector[] argumentOI;
private final IntWritable result = new IntWritable(0);
private MongodbHelper dbHelper = null;
private MongoCollection<Document> connection = null;
/**
* Created by : Administrator . Created time: 2017/1/12 19:26.
* Description: this should be connection;
* Param Description: url,username,password,query,column1[,column2...]*
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
argumentOI = arguments;
for (int i = 0; i < 4; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of function should be \""
+ Constants.STRING_TYPE_NAME + "\", but \""
+ arguments[i].getTypeName()
+ "\" is found");
}
}
}
for (int i = 4; i < arguments.length; i++) {
if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative"
+ ", but \"" + arguments[i].getTypeName()
+ "\" is found");
}
}
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
/**
* Created by : Administrator . Created time: 2017/1/12 19:28.
* Description: 0 stand for success , -1 stand for failure
*/
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
String url = arguments[0].get().toString();
String dbCollection = arguments[1].get().toString();
try {
if (null == dbHelper){
// 初始化链接
dbHelper = new MongodbHelper();
}
dbHelper.init(url);
connection = dbHelper.getCollection(dbCollection);
} catch (Exception ex) {
log.error("Driver loading or connection issue", ex);
result.set(2);
}
if (connection != null) {
try {
// 逻辑部分
String[] args = new String[10];
for (int i = 2; i < arguments.length; ++i) {
args[i-2] = arguments[i].get() == null? "0" : arguments[i].get().toString();
}
dbHelper.updateOneDocument(connection, args);
} catch (Exception e) {
log.error("Underlying SQL exception", e);
result.set(-1);
}
}
return result;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("dboutput(");
if (children.length > 0) {
sb.append(children[0]);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children[i]);
}
}
sb.append(")");
return sb.toString();
}
}
二、编写好UDF后,测试自定义函数:
准备工作:先将编写好的UDF函数jar包以及相关的依赖包上传服务器(windows下可选择WinSCP工具),比如:UDF函数上传到了/home/username/tmp目录下,依赖包在/home/username/tmp/lib下
1、在idea中配置Remote Configurations(如图一所示):
2、启用hive的debug模式:
hive --debug
3、此时正在监听8000端口(Listening for transport dt_socket at address: 8000),在idea中启动配置好的Remote的Debug模式;
4、输入以下命令,自定义函数名(sign_func):
use testDb;
add jar /home/user/tmp/hive-mongodb-1.0-SNAPSHOT.jar;
create temporary function sign_func as 'com.cn.udf.MongoStorageHandler';
desc function sign_func;
5、调用自定义函数sign_func:
select sign_func('mongodb://your_host:your_port/your_dbName','your_collection',[column1,column2...]) from your_tableName ;
// 例如
select sign_func('mongodb://127.0.0.1:27017/test_db','test_coll',Id,col1,col2,col3) from tab_info limit 5;
OK,去mongodb查询一下数据是否存入mongo库。