背景
目前flink读写hive表一直是一个比较麻烦的事情。虽然flink1.10版本更新了hive table api,生产环境中可以使用。但测试过程中还是会遇到很多问题。这里介绍一些实战过程中hive source和 hive sink案例供大家来参考
方案
hive source:
flink 1.9和1.10中官网提供连接hive source的方式。1.10版本已经测试成功,1.9版本官网则不建议在生产环境中使用。顾下面代码实现是基于flink1.10环境。
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
String hiveConf = "/test";
String hiveName = "test_source";
String HIVE_VERSION = "1.1.0";
// 注册catalog,使用hiveName的catalog
HiveCatalog catalog = new HiveCatalog(hiveName, null, hiveConf, HIVE_VERSION);
tableEnv.registerCatalog(hiveName,catalog);
tableEnv.useCatalog(hiveName);
Table table = tableEnv.sqlQuery(sql);
// table to DataStream
DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class);
hive sink:
思路1:flink写数据到hdfs,jdbc访问hive metastore调用alter table语句将数据load到表中。
ALTER TABLE test.test_table ADD IF NOT EXISTS PARTITION (dt_=20200305) LOCATION '/test/20200305'";
思路2:flink写数据到hdfs,HiveMetastoreClient 调用add_partition,将数据添加到hive。HiveMetastoreClientWrapper 对象为flink-connect-hive.jar中的对象,大家也可以直接使用hive-exec.jar的HiveMetastoreClient。
String HIVE_VERSION = "1.1.0";
HiveConf hiveConf = createHiveConf("/test");
hiveConf.set("hive-version", hiveConf);
HiveMetastoreClientWrapper hMs = HiveMetastoreClientFactory.create(hiveConf, HIVE_VERSION);
org.apache.hadoop.hive.metastore.api.Table table = hMs.getTable(database, tableName);
// alter table add partition
Partition pa = new Partition();
pa.setDbName(table.getDbName());
pa.setTableName(table.getTableName());
pa.setValues(values); // 分区数值,eg dt = 20200305
pa.setParameters(new HashMap<String, String>());
pa.setSd(table.getSd());
pa.getSd().setSerdeInfo(table.getSd().getSerdeInfo());
pa.getSd().setLocation(location); // 分区的location路径
Partition retp = hiveMetastore.add_partition(partition);
思路3:采用flink table api,将数据写入hive。此种思路只在flink 批环境中测试成功,在流环境中未测试成功。在这里吐槽一下,总感觉flink与blink合并之后存在很多不通用的代码实现。非常难以理解。
HiveCatalog catalog = new HiveCatalog(hiveName, null, hiveConf, HIVE_VERSION);
tableEnv.registerCatalog(hiveName,catalog);
tableEnv.useCatalog(hiveName);
// database.tableName 为hive中的表
// 创建输入表(use catalog 放在前面)
Table inputTable = tableEnv.fromDataSet(...);
String table2= inputTable.toString();
String sql = "insert overwrite database.tableName partition ( dt = 20200305 ) select * from " + table2;
tableEnv.sqlUpdate(sql);