package com.nie.k2h.hbase;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import com.nie.k2h.init.LogLoader;
import com.nie.k2h.order.SKUInfo;
public class HbaseUtils {
private static Logger logger = Logger.getLogger(HbaseUtils.class);
private static Configuration conf = null;
static int count=0;
/**
* 根据行键进行数据查询
*
* @param tableName 表名称,现有表有:sql_gdm_m03_item_sku_da_jss_201405
* @param rowKey 行键
* @param columns 返回列过滤(item_first_cate_cd,item_second_cate_cd,
item_third_cate_cd,work_post_cd,shop_id,dept_id_1,
dept_name_1,dept_id_2,dept_name_2,dept_id_3,dept_name_3)
* @return
* @throws IOException
*/
public static Map<String,String> getResult(HTableInterface table,String cf, String rowKey, String...columns)
throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
for(String column:columns) {
get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column));
}
Map<String,String> map = new HashMap<String,String>();
Result result = table.get(get);
List<KeyValue> list = result.list();
if(list!=null){
for (KeyValue kv : result.list()) {
map.put(Bytes.toString(kv.getQualifier()), Bytes.toString(kv.getValue()));
}
}
return map;
// return result;
}
/*
* 遍历查询hbase表
*
* @tableName 表名
*/
public static void getResultScann(String tableName, String start_rowkey,
String stop_rowkey) throws IOException {
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(start_rowkey));
scan.setStopRow(Bytes.toBytes(stop_rowkey));
ResultScanner rs = null;
HTable table = new HTable(conf, Bytes.toBytes(tableName));
try {
rs = table.getScanner(scan);
for (Result r : rs) {
for (KeyValue kv : r.list()) {
logger.error("row:" + Bytes.toString(kv.getRow()));
logger.error("family:"
+ Bytes.toString(kv.getFamily()));
logger.error("qualifier:"
+ Bytes.toString(kv.getQualifier()));
logger.error("|" + Bytes.toString(kv.getValue()));
logger.error("timestamp:" + kv.getTimestamp());
}
logger.error("\n-------------------------------------------");
}
} finally {
rs.close();
table.close();
}
}
public static void getItemByItemSkuId(String itemSkuId) {
try{
// getResult("cf","sql_gdm_m03_item_sku_da_jss_",itemSkuId);
}catch(Exception ex) {
ex.printStackTrace();
}
}
/*
* 删除表
*
* @tableName 表名
*/
public static void deleteTable(String tableName) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
admin.close();
System.out.println(tableName + "is deleted!");
}
/**
* 获取表名称
*
* @return
*/
public static String loadTable() {
HBaseAdmin admin = null;
logger.debug("++++++++++++++++++++++++++++++start");
try {
String prefix = "sql_gdm_m03_item_sku_da_jss_";
admin = new HBaseAdmin(conf);
//String [] tables = new String[]{"app_item_sku_sales_statistics_201508","app_item_sku_sales_statistics_201405","app_item_sku_sales_statistics_201403"};
Pattern p = Pattern.compile("^"+prefix+".\\d+$");
String [] tables = admin.getTableNames(p);
//String [] tables = admin.getTableNames();
if(tables==null||tables.length==0){
return null;
}
/*
Matcher m = p.matcher("app_item_sku_sales_statistics201508");
// 查找相应的字符串
while (m.find()) {
String tmp = m.group();
if (!"".equals(tmp)) {
System.out.duoxi(tmp);
}
}*/
int[] suffixs = new int[tables.length];
for(int i=0;i<tables.length;i++) {
logger.debug("----------table Name-------------"+tables[i]);
//if(!tables[i].startsWith(prefix)) continue;
String suffix = tables[i].substring(tables[i].lastIndexOf("_")+1);
suffixs[i]=Integer.parseInt(suffix);
}
Arrays.sort(suffixs);
logger.debug("++++++++++++++++++++++++++++++end");
return prefix+suffixs[suffixs.length-1];
}catch(Exception ex) {
logger.error("获取表异常:",ex);
return null;
}finally {
try {
admin.close();
}catch(IOException io) {
logger.error("关闭表异常:",io);
}
}
}
/*
* 创建表
*
* @tableName 表名
*
* @family 列族列表
*/
public static void creatTable(String tableName, String[] family)
throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(tableName);
for (int i = 0; i < family.length; i++) {
desc.addFamily(new HColumnDescriptor(family[i]));
}
if (admin.tableExists(tableName)) {
logger.error("---table " + tableName + " Exists!---");
System.exit(0);
} else {
admin.createTable(desc);
logger.debug("---create table " + tableName + " Success!---");
}
admin.close();
}
public static void IncrValue(HTableInterface htable,String rowKey,
String family,String columnArr[],long valueArr[]){
try{
htable.setAutoFlush(false);
Increment inc = new Increment(Bytes.toBytes(rowKey));
for (int j = 0; j < columnArr.length; j++) {
inc.addColumn(Bytes.toBytes(family), Bytes.toBytes(columnArr[j]),valueArr[j]);
}
htable.increment(inc);
}catch(Exception e){
LogLoader.getLog().error("IncrValue error msg",e);
}
}
/**
*
* @param tableName
* @param familyColumn
* @param rowKey
* @param columnArr
* @param time
* @param valueArr
* @throws IOException
*/
public static boolean addData(HTableInterface htable,String familyColumn,
String rowKey,String columnArr[],long ts,String valueArr[]) throws IOException{
Put put=null;
try{
htable.setAutoFlush(false);
put = new Put(Bytes.toBytes(rowKey));
for (int j = 0; j < columnArr.length; j++) {
put.add(Bytes.toBytes(familyColumn),
Bytes.toBytes(columnArr[j]),
ts,Bytes.toBytes(valueArr[j]));
}
htable.put(put);
return true;
}catch(Exception e){
System.out.println("error count="+count++);
return false;
//LogLoader.getLog().error("addData error msg : "+e.getMessage(),e);
}
}
public static String getRowKey(long orderId){
String id= String.valueOf(orderId);
if(id==null) return null;
StringBuffer buf= new StringBuffer(id).reverse();
while(buf.length()<13){
buf.append("0");
}
return buf.toString();
}
public static void main(String[] args) {
System.out.println(getRowKey(120876300L));
}
/**
* 字符串左补零
*
* @param
* @return
*/
public static String getLeftAddZero(String str, int len) {
len = len - str.length();
for (int i = 0; i < len; i++) {
str = "0" + str;
}
return str;
}
/**
* 查询hbase中的sku的信息
* @param skus
* @return
*/
public static Map<String,String> getCityParComRELA(HTableInterface htable) {
ResultScanner rs = null;
Map<String,String> map = new HashMap<String, String>();
try {
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("centerNum"));
rs = htable.getScanner(scan);
if(rs != null ) {
for(Result r=rs.next();r!=null;r=rs.next()){
for(KeyValue kv : r.raw()){
map.put(Bytes.toString(kv.getRow()),Bytes.toString(kv.getValue()) );
}
}
}
} catch(Exception ex) {
LogLoader.getLog().error("hbase query getCityParComRELA error",ex);
}
return map;
}
}
HbaseUtil
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...