Flink SQL Client集成Hive

环境

CDH-6.3.2
Flink-1.12.2
最后会补充说明Flink-1.13.1的使用方法

准备

flink-1.12.2/lib目录需要放如下jar包

flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar
hadoop-common-3.0.0-cdh6.3.2.jar
hadoop-mapreduce-client-common-3.0.0-cdh6.3.2.jar
hadoop-mapreduce-client-core-3.0.0-cdh6.3.2.jar
hadoop-mapreduce-client-hs-3.0.0-cdh6.3.2.jar
hadoop-mapreduce-client-jobclient-3.0.0-cdh6.3.2.jar

启动Flink Session on YARN

export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached

配置sql-client-defaults.yaml

此配置文件在flink-1.12.2/conf目录下,修改后如下:

#==============================================================================
# Tables
#==============================================================================

# Define tables here such as sources, sinks, views, or temporal tables.

tables: [] # empty list
# A typical table source definition looks like:
# - name: ...
#   type: source-table
#   connector: ...
#   format: ...
#   schema: ...

# A typical view definition looks like:
# - name: ...
#   type: view
#   query: "SELECT ..."

# A typical temporal table definition looks like:
# - name: ...
#   type: temporal-table
#   history-table: ...
#   time-attribute: ...
#   primary-key: ...


#==============================================================================
# User-defined functions
#==============================================================================

# Define scalar, aggregate, or table functions here.

functions: [] # empty list
# A typical function definition looks like:
# - name: ...
#   from: class
#   class: ...
#   constructor: ...


#==============================================================================
# Catalogs
#==============================================================================

# Define catalogs here.

catalogs:
  - name: myhive
    type: hive
    hive-conf-dir: /etc/hive/conf
    hive-version: 2.1.1
    default-database: test

#==============================================================================
# Modules
#==============================================================================

# Define modules here.

#modules: # note the following modules will be of the order they are specified
#  - name: core
#    type: core

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table program.

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'blink' (used by default) or 'old'
  planner: blink
  # 'batch' or 'streaming' execution
  type: batch
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: processing-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog', 'table' or 'tableau' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 100
  # parallelism of the program
  # parallelism: 1
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 0
  # maximum idle state retention in ms
  max-idle-state-retention: 0
  # current catalog ('default_catalog' by default)
  current-catalog: default_catalog
  # current database of the current catalog (default database of the catalog by default)
  current-database: default_database
  # controls how table programs are restarted in case of a failures
  # restart-strategy:
    # strategy type
    # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
    # type: fallback

#==============================================================================
# Configuration options
#==============================================================================

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" web page.

# A configuration can look like:
# configuration:
#   table.exec.spill-compression.enabled: true
#   table.exec.spill-compression.block-size: 128kb
#   table.optimizer.join-reorder-enabled: true
configuration:
  table.sql-dialect: hive

#==============================================================================
# Deployment properties
#==============================================================================

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000
  # (optional) address from cluster to gateway
  gateway-address: ""
  # (optional) port from cluster to gateway
  gateway-port: 0

启动SQL Client

./bin/sql-client.sh embedded

执行查询

                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL> use catalog myhive;

Flink SQL> use test;

Flink SQL> select * from u_data;

上面查询的就是hive中test这个库中的u_data表。

特别说明

以上操作对于普通的文本格式的hive表是可以正常查询的,但是对于orc格式的hive表,基于CDH-6.3.2这个环境是会报错的。所以在执行上面的操作之前需要如下准备:

  1. 使用flink源码重新编译生成flink-sql-connector-hive-2.2.0_2.11,编译之前需要修改flink-sql-connector-hive-2.2.0中的pom文件,将hive-exec的版本改成2.1.1-cdh6.3.2
  2. 将新编译好的jar包放到flink-1.12.2/lib目录中,替换原有jar包
  3. 将CDH环境中的libfb303-0.9.3.jar放到flink-1.12.2/lib目录中

根因分析

错误如下

java.lang.IllegalArgumentException: Unrecognized Hadoop major version number: 3.0.0-cdh6.3.2
 at org.apache.hadoop.hive.shims.ShimLoader.getMajorVersion(ShimLoader.java:177) ~[flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar:1.12.0]
 at org.apache.hadoop.hive.shims.ShimLoader.loadShims(ShimLoader.java:144) ~[flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar:1.12.0]
 at org.apache.hadoop.hive.shims.ShimLoader.getHadoopShims(ShimLoader.java:99) ~[flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar:1.12.0]
 at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.<clinit>(OrcInputFormat.java:161) ~[flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar:1.12.0]
 at java.lang.Class.forName0(Native Method) ~[?:1.8.0_181]
 at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_181]

通过排查ShimLoader.java源码,开源社区hive 2.x的版本这种情况下是不支持hadoop 3.x版本。但是CDH中hive 2.1.1-cdh6.3.2版本和社区版本是不一样的,可以支持hadoop 3.x版本。
社区版

public static String getMajorVersion() {
  String vers = VersionInfo.getVersion();

  String[] parts = vers.split("\\.");
  if (parts.length < 2) {
    throw new RuntimeException("Illegal Hadoop Version: " + vers +
        " (expected A.B.* format)");
  }

  switch (Integer.parseInt(parts[0])) {
  case 2:
    return HADOOP23VERSIONNAME;
  default:
    throw new IllegalArgumentException("Unrecognized Hadoop major version number: " + vers);
  }
}

CDH版(反编译)

public static String getMajorVersion() {
    String vers = VersionInfo.getVersion();
    String[] parts = vers.split("\\.");
    if (parts.length < 2)
      throw new RuntimeException("Illegal Hadoop Version: " + vers + " (expected A.B.* format)"); 
    switch (Integer.parseInt(parts[0])) {
      case 2:
      case 3:
        return "0.23";
    } 
    throw new IllegalArgumentException("Unrecognized Hadoop major version number: " + vers);
  }

Flink-1.13.1使用方法

flink 1.13.1版本以后可以使用初始化sql脚本的方式启动sql client而不再使用yaml文件,例如创建sql-init.sql放到conf下面,内容如下:

CREATE CATALOG MyCatalog
  WITH (
    'type' = 'hive',
    'hive-conf-dir'='/etc/hive/conf',
    'hive-version'='2.1.1'
  );
USE CATALOG MyCatalog;
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'table';
SET 'table.sql-dialect'='hive';

启动client的命令./bin/sql-client.sh -i conf/sql-init.sql这样就可以进入client执行sql命令了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354