hbase的更新和删除,查询数据:
1,首先编写一个hbase的工具类:
/**
-
HBase 工具类
*/
public class HBaseUtil {private static final Logger logger = LoggerFactory.getLogger(HBaseUtil.class);
private static Configuration conf;
private static Connection conn;static {
try {
if (conf == null) {
conf = HBaseConfiguration.create();
final com.knowyou.homeBandDataService.common.Configuration config = com.knowyou.homeBandDataService.common.Configuration
.getInstance();
final String zkHost = config.getProperty("hbase.zookeeper.quorum",
"10.1.11.108,10.1.11.110,10.1.11.111");
final String zkport = config.getProperty("hbase.zooKeeper.property.clientport", "2181");
final String znode = config.getProperty("zookeeper.znode.parent", "/hbase-unsecure");
conf.set("hbase.zookeeper.quorum", zkHost);
conf.set("hbase.zooKeeper.property.clientport", zkport);
conf.set("zookeeper.znode.parent", znode);
}
} catch (Exception e) {
logger.error("HBase Configuration Initialization failure !");
throw new RuntimeException(e);
}
}/**
- 获得链接
- @return
*/
public static synchronized Connection getConnection() {
try {
if (conn == null || conn.isClosed()) {
conn = ConnectionFactory.createConnection(conf);
}
} catch (IOException e) {
logger.error("HBase 建立链接失败 ", e);
}
return conn;
}
public static void main(String[] args) {
final String property = com.knowyou.homeBandDataService.common.Configuration.getInstance()
.getProperty("hbase.zookeeper.quorum", "xxxxxxx");
System.out.println(property);
}/**
- 创建表
- @param tableName
- @throws Exception
*/
public static void createTable(String tableName, String[] columnFamilies, boolean preBuildRegion) throws Exception {
if (preBuildRegion) {
String[] s = new String[]{"1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"};
int partition = 16;
byte[][] splitKeys = new byte[partition - 1][];
for (int i = 1; i < partition; i++) {
splitKeys[i - 1] = Bytes.toBytes(s[i - 1]);
}
createTable(tableName, columnFamilies, splitKeys);
} else {
createTable(tableName, columnFamilies);
}
}
private static void createTable(String tableName, String[] cfs, byte[][] splitKeys) throws Exception {
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
if (admin.tableExists(tableName)) {
logger.warn("Table: {} is exists!", tableName);
return;
}
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
for (int i = 0; i < cfs.length; i++) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfs[i]);
hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);
hColumnDescriptor.setMaxVersions(1);
tableDesc.addFamily(hColumnDescriptor);
hColumnDescriptor.setInMemory(true);
//设置数据的保存时间----数据存90天
// hColumnDescriptor.setTimeToLive(90 * 24 * 60 * 60);
}
admin.createTable(tableDesc, splitKeys);
logger.info("Table: {} create success!", tableName);
} finally {
admin.close();
closeConnect(conn);
}
}private static void createTable(String tableName, String[] cfs) throws Exception {
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
if (admin.tableExists(tableName)) {
logger.warn("Table: {} is exists!", tableName);
return;
}
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
for (int i = 0; i < cfs.length; i++) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfs[i]);
hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);
hColumnDescriptor.setMaxVersions(1);
tableDesc.addFamily(hColumnDescriptor);
hColumnDescriptor.setInMemory(true);
//设置数据的保存时间----数据存5分钟
// hColumnDescriptor.setTimeToLive(5 * 60);
}
admin.createTable(tableDesc);
logger.info("Table: {} create success!", tableName);
} catch (Exception e) {
e.printStackTrace();
} finally {
admin.close();
closeConnect(conn);
}
}/**
- 删除表
- @param tablename
- @throws IOException
*/
public static void deleteTable(String tablename) throws IOException {
Connection conn = getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
try {
if (!admin.tableExists(tablename)) {
logger.warn("Table: {} is not exists!", tablename);
return;
}
admin.disableTable(tablename);
admin.deleteTable(tablename);
logger.info("Table: {} delete success!", tablename);
} finally {
admin.close();
closeConnect(conn);
}
}
/**
- 获取 Table
- @param tableName 表名
- @return
- @throws IOException
*/
public static Table getTable(String tableName) {
try {
return getConnection().getTable(TableName.valueOf(tableName));
} catch (Exception e) {
logger.error("Obtain Table failure !", e);
}
return null;
}
/**
- 异步的往表里面添加数据
- @param tablename
- @param rowKey
- @param columnFamiliName
- @param column
- @param value
- @return
- @throws Exception
*/
//String rowKey, String tableName, String columnFamiliName, String[] column,String[] value
public static long put(String rowKey, String tablename, String columnFamiliName, String[] column, String[] value)
throws Exception {
long currentTime = System.currentTimeMillis();
Table htable = getConnection().getTable(TableName.valueOf(tablename));
Put put = new Put(Bytes.toBytes(rowKey));
HColumnDescriptor[] columnFamilies = htable.getTableDescriptor().getColumnFamilies();
for (int i = 0; i < columnFamilies.length; i++) {
String familyName = columnFamilies[i].getNameAsString(); // 获取列族名
if (familyName.equals(columnFamiliName)) {
for (int j = 0; j < column.length; j++) {
put.addColumn(Bytes.toBytes(columnFamiliName), Bytes.toBytes(column[j]), Bytes.toBytes(value[j]));
}
}
}
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
for (int i = 0; i < e.getNumExceptions(); i++) {
logger.error("Failed to sent put " + e.getRow(i) + ".");
}
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename)).listener(listener);
params.writeBufferSize(5 * 1024 * 1024);
final BufferedMutator mutator = conn.getBufferedMutator(params);
try {
final List<Put> puts = Arrays.asList(put);
mutator.mutate(puts);
mutator.flush();
} finally {
mutator.close();
closeConnect(conn);
}
return System.currentTimeMillis() - currentTime;
}
/**
- 异步往指定表添加数据
- @param tablename 表名
- @param put 需要添加的数据
- @return long 返回执行时间
- @throws IOException
*/
public static long put(String tablename, Put put) throws Exception {
return put(tablename, (Put) Arrays.asList(put));
}
/**
- 往指定表添加数据
- @param tablename 表名
- @param puts 需要添加的数据
- @return long 返回执行时间
- @throws IOException
*/
public static long putByHTable(String tablename, List<?> puts) throws Exception {
long currentTime = System.currentTimeMillis();
Connection conn = getConnection();
HTable htable = (HTable) conn.getTable(TableName.valueOf(tablename));
htable.setAutoFlushTo(false);
htable.setWriteBufferSize(5 * 1024 * 1024);
try {
htable.put((List<Put>) puts);
htable.flushCommits();
} finally {
htable.close();
closeConnect(conn);
}
return System.currentTimeMillis() - currentTime;
}
/**
- 删除单条数据
- @param tablename
- @param row
- @throws IOException
*/
public static void delete(String tablename, String row) throws IOException {
Table table = getTable(tablename);
if (table != null) {
try {
Delete d = new Delete(row.getBytes());
table.delete(d);
} finally {
table.close();
}
}
}
/**
- 删除多行数据
- @param tablename
- @param rows
- @throws IOException
*/
public static void delete(String tablename, String[] rows) throws IOException {
Table table = getTable(tablename);
if (table != null) {
try {
List<Delete> list = new ArrayList<Delete>();
for (String row : rows) {
Delete d = new Delete(row.getBytes());
list.add(d);
}
if (list.size() > 0) {
table.delete(list);
}
} finally {
table.close();
}
}
}
/**
- 关闭连接
- @throws IOException
*/
public static void closeConnect(Connection conn) {
if (null != conn) {
try {
// conn.close();
} catch (Exception e) {
logger.error("closeConnect failure !", e);
}
}
}
/**
- 获取单条数据
- @param tablename
- @param row
- @return
- @throws IOException
*/
public static Result getRow(String tablename, byte[] row) {
Table table = getTable(tablename);
Result rs = null;
if (table != null) {
try {
Get g = new Get(row);
rs = table.get(g);
} catch (IOException e) {
logger.error("getRow failure !", e);
} finally {
try {
table.close();
} catch (IOException e) {
logger.error("getRow failure !", e);
}
}
}
return rs;
}
/**
- 获取多行数据
- @param tablename
- @param rows
- @return
- @throws Exception
*/
public static <T> Result[] getRows(String tablename, List<T> rows) {
Table table = getTable(tablename);
List<Get> gets = null;
Result[] results = null;
try {
if (table != null) {
gets = new ArrayList<Get>();
for (T row : rows) {
if (row != null) {
gets.add(new Get(Bytes.toBytes(String.valueOf(row))));
} else {
throw new RuntimeException("hbase have no data");
}
}
}
if (gets.size() > 0) {
results = table.get(gets);
}
} catch (IOException e) {
logger.error("getRows failure !", e);
} finally {
try {
table.close();
} catch (IOException e) {
logger.error("table.close() failure !", e);
}
}
return results;
}
/**
- 扫描整张表
- @param tablename
- @return
- @throws IOException
*/
public static ResultScanner get(String tablename) {
Table table = getTable(tablename);
ResultScanner results = null;
if (table != null) {
try {
Scan scan = new Scan();
scan.setCaching(1000);
results = table.getScanner(scan);
} catch (IOException e) {
logger.error("getResultScanner failure !", e);
} finally {
try {
table.close();
} catch (IOException e) {
logger.error("table.close() failure !", e);
}
}
}
return results;
}
public static void addData(String rowKey, String tableName, String columnFamiliName, String[] column,
String[] value) throws IOException {
Table htable = null;
try {
htable = getConnection().getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
// 获取表
HColumnDescriptor[] columnFamilies = htable.getTableDescriptor() // 获取所有的列族
.getColumnFamilies();for (int i = 0; i < columnFamilies.length; i++) { String familyName = columnFamilies[i].getNameAsString(); // 获取列族名 if (familyName.equals(columnFamiliName)) { for (int j = 0; j < column.length; j++) { put.add(Bytes.toBytes(columnFamiliName), Bytes.toBytes(column[j]), Bytes.toBytes(value[j])); } } } htable.put(put); htable.close();
// System.out.println("add data Success! the rk is :"+rowKey);
logger.info("add data Success! the rk is :" + rowKey);
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
if (htable != null) {
try {
htable.close();
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}
}
public static void addDataAndCloseConn(String rowKey, String tableName, String columnFamiliName, String[] column,
String[] value) throws IOException {
Table htable = null;
try {
htable = getConnection().getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
// 获取表
HColumnDescriptor[] columnFamilies = htable.getTableDescriptor() // 获取所有的列族
.getColumnFamilies();
for (int i = 0; i < columnFamilies.length; i++) {
String familyName = columnFamilies[i].getNameAsString(); // 获取列族名
if (familyName.equals(columnFamiliName)) {
for (int j = 0; j < column.length; j++) {
put.add(Bytes.toBytes(columnFamiliName), Bytes.toBytes(column[j]), Bytes.toBytes(value[j]));
}
}
}
htable.put(put);
htable.close();
System.out.println("add data Success!");
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
if (htable != null) {
try {
htable.close();
if (conn != null) {
conn.close();
}
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}
}
/*
* 查询表中的某一列
*
* @tableName 表名
*
* @rowKey rowKey
*/
public static String getResultByColumn(String tableName, String rowKey, String familyName, String columnName) {
Table htable = null;
try {
htable = getConnection().getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列
Result result = htable.get(get);
String value = "";
if (result != null && !result.isEmpty()) {
for (KeyValue kv : result.list()) {
value = Bytes.toString(kv.getValue());
}
}
return value;
} catch (Exception e) {
logger.error(e.getMessage());
return "";
} finally {
if (htable != null) {
try {
htable.close();
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}
}
public static String getResultByColumnAndCloseConn(String tableName, String rowKey, String familyName,
String columnName) {
Table htable = null;
try {
htable = getConnection().getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列
Result result = htable.get(get);
String value = "";
if (result != null && !result.isEmpty()) {
for (KeyValue kv : result.list()) {
value = Bytes.toString(kv.getValue());
}
}
return value;
} catch (Exception e) {
logger.error(e.getMessage());
return "";
} finally {
if (htable != null) {
try {
htable.close();
if (conn != null) {
conn.close();
}
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}
}
/**
* 测试输出结果
*/
public static void formatRow(KeyValue[] rs) {
for (KeyValue kv : rs) {
System.out.println(" column family : " + Bytes.toString(kv.getFamily()));
System.out.println(" column : " + Bytes.toString(kv.getQualifier()));
System.out.println(" value : " + Bytes.toString(kv.getValue()));
System.out.println(" timestamp : " + String.valueOf(kv.getTimestamp()));
System.out.println("--------------------");
}
}
}
2,编写一个连接池
/**
-
线程池工具类
*/
public class ThreadPoolUtil {private static final Logger logger = LoggerFactory.getLogger(ThreadPoolUtil.class);
private static ThreadPoolUtil threadPool;
private ThreadPoolExecutor executor=null;
//线程池的基础参数 实际使用可写入到配置文件中
private int corePoolSize = 40; // 核心池的大小 运行线程的最大值 当线程池中的线程数目达到corePoolSize后,就会把多余的任务放到缓存队列当中;
private int maximumPoolSize = 60; // 创建线程最大值
private long keepAliveTime = 1; // 线程没有执行任务时 被保留的最长时间 超过这个时间就会被销毁 直到线程数等于 corePoolSize
private long timeout = 10; // 等待线程池任务执行结束超时时间/** 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; 天
TimeUnit.HOURS; 小时
TimeUnit.MINUTES; 分钟
TimeUnit.SECONDS; 秒
TimeUnit.MILLISECONDS; 毫秒
TimeUnit.MICROSECONDS; 微妙
TimeUnit.NANOSECONDS; 纳秒***/
private TimeUnit unit= TimeUnit.SECONDS;/**
用来储存等待中的任务的容器
几种选择:
- ArrayBlockingQueue;
- LinkedBlockingQueue;
- SynchronousQueue;
- 区别太罗嗦请百度 http://blog.csdn.net/mn11201117/article/details/8671497
*/
private LinkedBlockingQueue workQueue=new LinkedBlockingQueue<Runnable>();
/**
- 单例
- @return
*/
public static ThreadPoolUtil init(){
if(threadPool==null)
threadPool=new ThreadPoolUtil();
return threadPool;
}
/**
- 私有构造方法
*/
private ThreadPoolUtil(){
//实现线程池
executor=new ThreadPoolExecutor(corePoolSize,maximumPoolSize, keepAliveTime, unit,
workQueue);
System.out.println("线程池初始化成功");
}
/**
- 线程池获取方法
- @return
*/
public ThreadPoolExecutor getExecutor() {return executor;}
/**
- 准备执行 抛入线程池
- @param t
*/
public void execute(Thread t){
executor.execute(t);
}
public void execute(Runnable t){ executor.execute(t);}
public int getQueueSize(){
return executor.getQueue().size();
}/**
- 异步提交返回 Future
- Future.get()可获得返回结果
- @return
*/
public Future<?> submit(Runnable t){return executor.submit(t);}
/**
- 异步提交返回 Future
- Future.get()可获得返回结果
- @return
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public Future<?> submit(Callable t){return getExecutor().submit(t);}
/**
- 销毁线程池
- */
public void shutdown(){
getExecutor().shutdown();
}
/**
- 阻塞,直到线程池里所有任务结束
*/
public void awaitTermination() throws InterruptedException {
logger.info("Thread pool ,awaitTermination started, please wait till all the jobs complete.");
executor.awaitTermination(timeout, unit);
}
}
3,编写一个hbase的接口类:
/**
-
HBase 服务接口类
*/
public interface HBaseService {/**
- 创建表
*/
public void createTable(String tableName, String[] columnFamilies, boolean preBuildRegion) throws Exception;
/**
- 写入数据
*/
public void put(String tableName, Put put, boolean waiting);
/**
- 批量写入数据
*/
public void batchPut(String tableName, final List<Put> puts, boolean waiting);
/**
- 批量写入
- @param rowKey
- @param tableName
- @param columnFamiliName
- @param column
- @param value
- @param waiting
*/
public void bacthPut(String rowKey, String tableName, String columnFamiliName, String[] column, String[] value, boolean waiting);
/**
- @param tableName 表名称
- @param rowKey rk
- @param familyName 列簇
- @param columnName 具体column
- @return
*/
public String getResultByColumn(String tableName, String rowKey, String familyName, String columnName);
<T> Result[] getRows(String tablename, List<T> rows);
Result getRow(String tablename, byte[] row);
}
4,编写一个hbase的抽象类
/** - 创建表
HBase 服务抽象类
*/
public abstract class AbstractHBaseService implements HBaseService {
private static final Logger logger = LoggerFactory.getLogger(AbstractHBaseService.class);
@Override
public void createTable(String tableName, String[] columnFamilies, boolean preBuildRegion) throws Exception {
}
@Override
public void put(String tableName, Put put, boolean waiting) {
}
@Override
public void batchPut(final String tableName, final List<Put> puts, boolean waiting) {
}
@Override
public void bacthPut(String rowKey, String tableName, String columnFamiliName, String[] column, String[] value, boolean waiting) {
}
@Override
public String getResultByColumn(String tableName, String rowKey, String familyName, String columnName) {
return HBaseUtil.getResultByColumn(tableName, rowKey, familyName, columnName);
}
@Override
public <T> Result[] getRows(String tablename, List<T> rows) {
return null;
}
@Override
public Result getRow(String tablename, byte[] row) {
return null;
}
}
5,编写一个hbase的实现类
/**
- HBaseService Mutator 实现类
*/
class HBaseServiceImpl extends AbstractHBaseService {
private static final Logger logger = LoggerFactory.getLogger(HBaseServiceImpl.class);
private ThreadPoolUtil threadPool = ThreadPoolUtil.init(); // 初始化线程池
@Override
public void put(String tableName, Put put, boolean waiting) {
batchPut(tableName, Arrays.asList(put), waiting);
}
/**
* 批量提交
*/
@Override
public void bacthPut(final String rowKey, final String tableName, final String columnFamiliName, final String[] column, final String[] value,
boolean waiting) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
HBaseUtil.put(rowKey, tableName, columnFamiliName, column, value);
logger.info("sucuss to hbase.....the current rowkey is :" + rowKey + " the tablename is: " + tableName);
} catch (Exception e) {
logger.error("batchPut failed . .the rowkey is:" + rowKey + ",the tablename is :" + tableName + ",the root cause is :" + e.getMessage());
}
}
});
if (waiting) {
try {
threadPool.awaitTermination();
} catch (InterruptedException e) {
logger.error("HBase put job thread pool await termination time out.", e);
}
}
// try {
// HBaseUtil.put(tableName, rowKey, columnFamiliName, column, value);
// } catch (Exception e) {
// logger.error("batchPut failed . ", e);
// }
}
/**
* 获取具体的数据
*/
@Override
public String getResultByColumn(String tableName, String rowKey, String familyName, String columnName) {
return super.getResultByColumn(tableName, rowKey, familyName, columnName);
}
@Override
public <T> Result[] getRows(String tablename, List<T> rows) {
return HBaseUtil.getRows(tablename, rows);
}
@Override
public Result getRow(String tablename, byte[] row) {
return HBaseUtil.getRow(tablename, row);
}
/**
* 多线程异步提交
*/
public void batchAsyncPut(final String tableName, final List<Put> puts, boolean waiting) {
Future f = threadPool.submit(new Runnable() {
@Override
public void run() {
try {
HBaseUtil.putByHTable(tableName, puts);
} catch (Exception e) {
logger.error("batchPut failed . ", e);
}
}
});
if (waiting) {
try {
f.get();
} catch (InterruptedException e) {
logger.error("多线程异步提交返回数据执行失败.", e);
} catch (ExecutionException e) {
logger.error("多线程异步提交返回数据执行失败.", e);
}
}
}
/**
* 创建表
*/
public void createTable(String tableName, String[] columnFamilies, boolean preBuildRegion) throws Exception {
HBaseUtil.createTable(tableName, columnFamilies, preBuildRegion);
}
}
6,编写一个hbase方法的调用类
/**
HBase 各个组件管理调用类
-
可以根据配置文件来选择 HBase 官方 API 还是第三方API
*/
public class HBase {private HBase() {
}private static HBaseService hBaseService;
static {
hBaseService = new HBaseServiceImpl();
}
/**
- 创建hbase表
- @param tableName 表名称
- @param columnFamilies 列族
- @param preBuildRegion 是否预分区
*/
public static void createTable(String tableName, String[] columnFamilies, boolean preBuildRegion) {
try {
hBaseService.createTable(tableName, columnFamilies, preBuildRegion);
} catch (Exception e) {
System.out.println("create hbase table fail, the root cause is " + e.getMessage());
}
}
/**
- 写入单条数据
- @param tableName 表名称
- @param put 列值
- @param waiting 是否等待线程执行完成 true 可以及时看到结果, false 让线程继续执行,并跳出此方法返回调用方主程序
- @return
*/
public static void put(String tableName, Put put, boolean waiting) {
hBaseService.batchPut(tableName, Arrays.asList(put), waiting);
}
// /**
// * 多线程同步提交
// *
// * @param tableName 表名称
// * @param puts 待提交参数
// * @param waiting 是否等待线程执行完成 true 可以及时看到结果, false 让线程继续执行,并跳出此方法返回调用方主程序
// */
// public static void put(String tableName, List<Put> puts, boolean waiting) {
// hBaseService.batchPut(tableName, puts, waiting);
// }
public static void put(String rowKey, String tableName, String columnFamiliName, String[] column, String[] value, boolean waiting) {
hBaseService.bacthPut(rowKey, tableName, columnFamiliName, column, value, waiting);
}
/**
* 获取多行数据
*
* @param tablename
* @param rows
* @return
* @throws Exception
*/
public static <T> Result[] getRows(String tablename, List<T> rows) throws Exception {
return hBaseService.getRows(tablename, rows);
}
/**
* 获取单条数据
*
* @param tablename
* @param row
* @return
*/
public static Result getRow(String tablename, byte[] row) {
return hBaseService.getRow(tablename, row);
}
/**
* 获取具体value
*
* @param tableName
* @param rowKey
* @param familyName
* @param columnName
* @return
*/
public static String getResultByColumn(String tableName, String rowKey, String familyName, String columnName) {
return hBaseService.getResultByColumn(tableName, rowKey, familyName, columnName);
}
}