spark直接读取Hfile

如果spark在读取hbase的时候感觉速度达不到需求,可以直接读取hfile进行操作,看代码

package com.yoyi.data.user_profile

import com.yoyi.data.common.Application
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.mapreduce.Job

object Test extends Application {

  def main(args: Array[String]): Unit = {


    val hconf = CommonLabelFunc.createHbaseConf()
    hconf.set("hbase.rootdir", "hdfs://host:8020/hbase")
//    hconf.set(TableInputFormat.INPUT_TABLE, "demo_label_test")
    val sc = createSparkContext("", Seq())

//    val rdd = sc.textFile("hdfs://ns20.data.yoyi:8020/hbase/data/default/demo_label_test/fd47c44b1b341703f0ab40a1c47959f6/c")
//    rdd.foreach(println)

//    val hbaseContext = new HBaseContext(sc, hconf)
    val scan = new Scan()
//    val rdd = hbaseContext.hbaseRDD(TableName.valueOf("demo_label_test"), scan)
    val proto = ProtobufUtil.toScan(scan)
    hconf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()))
    val job = Job.getInstance(hconf)
    val snapName = "demo_label_test_snapshot"
    val path = new Path("hdfs://host:8020/tmp/snapshot")
    TableSnapshotInputFormat.setInput(job, snapName, path)
    val rdd = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    rdd.foreach(t => {
    val result = t._2
    val rowkey = new String(result.getRow)
    println(rowkey)
    val cells = tp._3.rawCells()
    for (cell <- cells) {
        val qualifierArray = cell.getQualifierArray
        val valueArray = cell.getValueArray
        val key = new String(qualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
        val value = new String(valueArray, cell.getValueOffset, cell.getValueLength)
        println("key: " + key)
        println("value: " + value)
    }
    
    })
//        println(rdd.count())

    sc.stop()
  }

}
  • 为了保证读取的hfile在处理期间不会变化,需要将待处理的表就行快照处理
  • spark直接通过newAPIHadoopRDD的api读起快照后的表,通过mr的方式读取并解析hfile

欢迎对技术感兴趣的小伙伴一起交流学习^^

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容