CarbonData 1.2.0集成Spark 2.1.0调研

编译

carbondata1.2已经支持hive+presto,carbon生态圈基本健全。

基于git checkout到branch-1.2,编译脚本:

#/bin/bash

mvn -DskipTests clean package

注意:虽然在其文档中说支持jdk1.7/1.8,但测试发现有使用到Map[String, String].getOfDefault()方法,导致现分支只支持jdk1.8编译。当然也可以手动修改下,给社区提个PR了。

测试

参考https://carbondata.apache.org/installation-guide.html配置说明,部署carbondata:

  • 由于CarbonData只支持spark2.1.0的小版本,使用spark2.1.3集成时会报无CatalystConf类,这个是由于Spark2.1.0+以后将该类重构了,所以必须依赖Spark2.1.0小版本;
  • 按照文档说明,tar zcvf carbondata.tar.gz并在spark.yarn.dist.archives/spark.executor.extraClassPath路径指明,否则会报找不到CarbonData相关类(此处要注意的是,carbondatalib中的carbondata_xxx.jar不要是软链-_-!!!);
  • 配置Spark,需要将编译好的carbondata_xxx.jar包集成至spark依赖中;
  • 配置carbondata,将carbondata源码中的参考carbondata.conf.template复制到SPAKR_HOME/conf下,并修改几处主要的路径(carbondata优化参数很多,后续慢慢调整);
  • 可以尝试,通过spark-shell测试集成是否成功,测试sql如下;

提供一份比较完整的spark-default.conf,注意有些路径写死了,需要修改:

## Driver/AM Settings ##
spark.yarn.am.waitTime                                100s
spark.yarn.am.cores                                   1
spark.yarn.am.memory                                  4g
spark.yarn.am.memoryOverhead                          2048
spark.yarn.am.extraJavaOptions                        -XX:PermSize=1024m -XX:MaxPermSize=2048m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution
spark.driver.maxResultSize                            1g
spark.driver.memory                                   4g


## Executor Settings ##
spark.executor.instances                              0
spark.executor.cores                                  4
spark.executor.memory                                 4g
spark.yarn.executor.memoryOverhead                    2048
spark.executor.extraJavaOptions                       -XX:PermSize=1024m -XX:MaxPermSize=1024m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Dcarbon.properties.filepath=carbon.properties


## Dynamic Allocation Settings ##
spark.shuffle.service.enabled                         true
spark.dynamicAllocation.enabled                       true
spark.dynamicAllocation.initialExecutors              0
spark.dynamicAllocation.minExecutors                  0
spark.dynamicAllocation.maxExecutors                  10
spark.dynamicAllocation.executorIdleTimeout           60s
#spark.dynamicAllocation.cachedExecutorIdleTimeout    10s

## SQL Configurations ##
spark.sql.autoBroadcastJoinThreshold                  104857600
spark.sql.warehouse.dir                               /user/warehouse
#spark.sql.warehouse.dir                               /user/hadoop/warehouse
# spark.sql.hive.convertCTAS                          true
# spark.sql.sources.default                           parquet
spark.sql.shuffle.partitions                          100

spark.driver.extraJavaOptions                        -Dcarbon.properties.filepath=/home/hadoop/work/spark/conf/carbon.properties
spark.driver.extraClassPath                          /home/hadoop/work/spark/carbonlib/*

spark.executor.extraClassPath                        carbondata.tar.gz/carbonlib/*

spark.yarn.dist.files                                /home/hadoop/work/spark/conf/carbon.properties
spark.yarn.dist.archives                             /home/hadoop/work/spark/carbonlib/carbondata.tar.gz

配置carbon.properties:

#Mandatory. Carbon Store path
carbon.storelocation=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/CarbonStore
#Base directory for Data files
carbon.ddl.base.hdfs.url=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/data
#Path where the bad records are stored
carbon.badRecords.location=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/badrecords

测试SQL中注意的是getOrCreateCarbonSession()方法需要提供两个已经创建好并有权限的HDFS路径,其中前者为storePath存储load/overwrite进来数据的默认路径,后者为metaStorePath(貌似没有起到效果), 如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/data", "hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/carbon.metastore")
carbon.sql("CREATE TABLE IF NOT EXISTS test_table2(id string, name string, city string, age Int) STORED BY 'carbondata'");
carbon.sql("drop table test_table");
carbon.sql("show tables").show()
carbon.sql("LOAD DATA INPATH '/tmp/carbon/sample.csv' INTO TABLE test_table2")
carbon.sql("SELECT * FROM test_table2").show()
carbon.sql("SELECT city, avg(age), sum(age) FROM test_table2 GROUP BY city").show()

CarbonData ThriftServer

上述方式只能做测试使用,真正的生成环境需要还是需要基于ThriftServer实现。搭建ThriftServer,启动脚本:

#!/bin/bash

export SPARK_HOME=/home/hadoop/work/spark

nohup ./bin/spark-submit \
        --conf spark.sql.hive.thriftServer.singleSession=true \
        --class org.apache.carbondata.spark.thriftserver.CarbonThriftServer  \
        $SPARK_HOME/carbonlib/carbondata_2.11-1.2.1-SNAPSHOT-shade-hadoop2.2.0.jar \
        hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/CarbonStore >logs/carbondata-thrift-server.log 2>&1 &

TPC-DS测试

官方并没有提供一个靠谱的测试集,正好手头上有tpc-ds之前的测试数据,所以就按照carbondata的语法修改了其创建表的语句,参考如下:

use tpcds;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

set hive.exec.max.dynamic.partitions.pernode=2000;
set hive.exec.max.dynamic.partitions=2000;

drop table if exists cb_store_sales;
create table cb_store_sales
(
  ss_sold_time_sk bigint,
  ss_item_sk bigint,
  ss_customer_sk bigint,
  ss_cdemo_sk bigint,
  ss_hdemo_sk bigint,
  ss_addr_sk bigint,
  ss_store_sk bigint,
  ss_promo_sk bigint,
  ss_ticket_number bigint,
  ss_quantity int,
  ss_wholesale_cost decimal(7,2),
  ss_list_price decimal(7,2),
  ss_sales_price decimal(7,2),
  ss_ext_discount_amt decimal(7,2),
  ss_ext_sales_price decimal(7,2),
  ss_ext_wholesale_cost decimal(7,2),
  ss_ext_list_price decimal(7,2),
  ss_ext_tax decimal(7,2),
  ss_coupon_amt decimal(7,2),
  ss_net_paid decimal(7,2),
  ss_net_paid_inc_tax decimal(7,2),
  ss_net_profit decimal(7,2)
) partitioned by (ss_sold_date_sk bigint)
stored by 'carbondata'
tblproperties('partition_type'='Hash','num_partitions'='31');

insert overwrite table cb_store_sales partition(ss_sold_date_sk) select ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_cdemo_sk,ss_hdemo_sk,ss_addr_sk,ss_store_sk,ss_promo_sk,ss_ticket_number,ss_quantity,ss_wholesale_cost,ss_list_price,ss_sales_price,ss_ext_discount_amt,ss_ext_sales_price,ss_ext_wholesale_cost,ss_ext_list_price,ss_ext_tax,ss_coupon_amt,ss_net_paid,ss_net_paid_inc_tax,ss_net_profit,ss_sold_date_sk from et_store_sales distribute by ss_sold_date_sk;

基于上述的ThriftServer提供的端口执行,执行脚本如下:

export SPARK_BEELINE_HOME=/home/hadoop/work/spark
${SPARK_BEELINE_HOME}/bin/beeline -u "jdbc:hive2://hzadg-mammut-platform1.server.163.org:10010/tpcds;hive.server2.proxy.user=hadoop" -f "$bin/create-table-sql/create-load-carbondata-partition-fact.sql"

其中注意:

  • SparkSQL其partitioned by并不需要指定partition类型及partition数量,但carbondata sql必须指定,同时其官方文档真实漏洞百出,num_partitions貌似文档中也是错的;
  • overwrite过程很慢,这个应该跟carbondata的设计有关,需要构造全局字典索引并写到HDFS中,所以耗时较长,30min+;

执行完毕后就会在HDFS中找到其相关数据,比如meta信息如下:

hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0
Found 64 items
-rw-r-----   3 hadoop hdfs      21284 2017-10-31 20:25 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/0_batchno0-0-1509450294012.carbonindex
-rw-r-----   3 hadoop hdfs      10170 2017-10-31 20:31 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/10_batchno0-0-1509450294012.carbonindex
-rw-r-----   3 hadoop hdfs      10170 2017-10-31 20:31 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/11_batchno0-0-1509450294012.carbonindex
-rw-r-----   3 hadoop hdfs      10170 2017-10-31 20:17 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/12_batchno0-0-1509450294012.carbonindex
...


hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales
Found 2 items
drwxr-x---   - hadoop hdfs          0 2017-10-31 19:44 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact
drwxr-x---   - hadoop hdfs          0 2017-10-31 20:32 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata
hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata
Found 2 items
-rw-r-----   3 hadoop hdfs       2796 2017-10-31 19:44 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata/schema
-rw-r-----   3 hadoop hdfs        268 2017-10-31 20:32 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata/tablestatus

验证TPC-DS

当前Carbondata还有许多工作要处理,比如基于beeline创建完毕的cb_store_sales,通过beeline访问,设置其hive.server2.proxy.user=hadoop,查询该表时,有如下问题,原因是hive.server2.proxy.user=hadoop这个语句没有生效,但访问之前创建的表则没有问题:

hadoop@hzadg-mammut-platform1:~/work/spark$ ./bin/beeline -u jdbc:hive2://hzadg-mammut-platform1.server.163.org:10010;hive.server2.proxy.user=hadoop

0: jdbc:hive2://hzadg-mammut-platform1.server> select * from cb_store_sales limit 10;
Error: org.apache.hadoop.security.AccessControlException: Permission denied: user=anonymous, access=EXECUTE, inode="/Carbon/CarbonStore/modifiedTime.mdt":hadoop:hdfs:drwxr-x---
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1722)
        at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:108)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3863)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1012)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) (state=,code=0)

现在基于CARBON_HOME/bin下的carbon-spark-sql执行,可以查询使用,测了下TPC-DS的一个数据集,基于carbondata和parquet数据格式,简单的操作(count, count(distinct),sum(), limit 10)carbondata均比parquet格式,性能要提升不少。

后续

  • TPC-DS全量SQL性能比较;
  • CarbonData优化原理调研;

参考:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,030评论 5 464
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,198评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 144,995评论 0 327
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,973评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,869评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,766评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,967评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,599评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,886评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,901评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,728评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,504评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,967评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,128评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,445评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,018评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,224评论 2 339

推荐阅读更多精彩内容