Java HBase 多线程

github: https://github.com/ares333/HbaseMultiThread

Java多线程是一个很麻烦的东西,为了简化开发提高性能用Java的多线程封装了HBase API,包括Scan,Get,Put,Delete四种操作。经过多次修改运行非常稳定,已经用于生产环境。内部线程通信使用wait()/notify()机制,效率非常高。本文只在Java层面讨论HBase的多线程,HBase API内部线程处理机制不在本文讨论范围之内。HBase客户端版本 0.92.1,本类源码结尾处可以下载。下面通过demo演示一下用法,结尾处有代码打包下载。

特点:

  • 从Java层面用多线程最大化了读写性能。
  • 大量减少了应用程序的代码量,现在可以集中精力到数据分析上了。
  • 性能平均下来至少比单线程高出1个数量级,某些情况下高出两个数量级。

所有返回数据都是HashMap<String,String>,代表hbase中一行的一个列簇,数据中必然有一个键__rowkey保存本行的rowkey,所有列簇默认为cf,可以使用setCf()提前设置。
文档和最新版本可能有细微出入,不再修改文档。

HScan

HScan有5个public方法:

  1. public HScan(String table, String[] columns, int threadNum),table指定表名,columns指定扫描的列,如果设为null代表扫描所有的列,如果设为空数组代表只扫描rowkey,threadNum设置扫描器的并发数。
  2. public void addTask(String start, String end, int num),前两个参数是起止rowkey,第三个参数用于控制rowkey平分成num段,这个参数主用用于可整型计算的时间戳开头的rowkey,如果非整型范围内数字这个参数设为1即可。这个方法可以调用多次,每次添加num个任务到任务池。
  3. public void fuck(),继承来的方法,开始进行扫描。
  4. public HashMap<String, String> fetch(),从缓冲区里取一个结果出来,结果中的键由列名和"rowKey"组成,如果返回null表示所有行都已经读完了。
  5. public void status(),一个非同步方法,用于近似显示线程的运行状态,总是在当前行显示,内容由4部分组成2013-01-28_103053,第一列显示已经返回的结果数,第二列显示缓冲区中的行数,第三列分别表示 当前并发线程数/Thread.State.WAITING数/Thread.State.BLOCKED数,第四列显示任务池中的任务数。

Demo,扫描gm_player_detail表2013年1月1日到10日的所有数据

package joyport.hbase.gm;

import java.util.HashMap;

public class Test {
	private static int threadNum = 30;
	private static int taskNum = 1000;
	private static String htable = "gm_player_detail";

	public static void main(String[] args) throws Exception {
		int[] time = new int[2];
		if (args.length == 2) {
			time = Util.getTimeScale(args[0], args[1]);
		} else if (args.length == 1) {
			time = Util.getTimeScale(-1);
			time[0] = Util.getTimeScale(args[0], args[0])[0];
		} else {
			time = Util.getTimeScale(-1);
		}
		HScan hScan = new HScan(htable, null, threadNum);
		hScan.addTask(String.valueOf(time[0]), String.valueOf(time[1]), taskNum);
		hScan.fuck();
		Test test = new Test();
		test.analyse(hScan);
	}

	public void analyse(HScan hScan) throws InterruptedException {
		HashMap<String, String> row = null;
		for (row = hScan.fetch(); row != null; row = hScan.fetch()) {
			hScan.status();
			//System.out.println(row);
		}
	}
}

Util.getTimeScale(-1)获取昨天的起止时间戳
Util.getTimeScale(args[0], args[1])根据yyyy:mm:dd格式的日期获取起止时间戳,java用的是微秒计算的时候可能跨天,如果总是用这个函数就不会出现时间断裂或重复。
编译然后运行
java joyport.hbase/gm/Test 2013-01-01 2013-01-10

HGet

get操作还是很有用途的,比如有些需求需要这样来做,统计每一天的前n条记录,但是一条记录字段特别多,可以先只Scan必要字段计算出前n,然后根据rowkey再回去get详细数据,比直接在详细数据上计算性能高多了。因为需要get的rowkey都是放在内存的,所以如果需要大量get需要自己控制内存使用率。public 方法和HScan类似,自己看代码就明白。

Demo,只扫描rowkey然后根据rowkey用get获取所有列。

package joyport.hbase.gm;

import java.util.HashMap;

public class Test {
	private static int threadNum = 30;
	private static int taskNum = 1000;
	private static String htable = "gm_player_detail";

	public static void main(String[] args) throws Exception {
		int[] time = new int[2];
		if (args.length == 2) {
			time = Util.getTimeScale(args[0], args[1]);
		} else if (args.length == 1) {
			time = Util.getTimeScale(-1);
			time[0] = Util.getTimeScale(args[0], args[0])[0];
		} else {
			time = Util.getTimeScale(-1);
		}
		String[] cols = {};
		HScan hScan = new HScan(htable, cols, threadNum);
		hScan.addTask(String.valueOf(time[0]), String.valueOf(time[1]), taskNum);
		hScan.fuck();
		Test test = new Test();
		test.analyse(hScan);
	}

	public void analyse(HScan hScan) throws Exception {
		HashMap<String, String> row = null;
		HGet hGet = new HGet(htable, null, 30);
		for (row = hScan.fetch(); row != null; row = hScan.fetch()) {
			hScan.status();
			//System.out.println(row);
			hGet.addTask(row.get("rowKey"));
		}
		hGet.fuck();
		for (row = hGet.fetch(); row != null; row = hGet.fetch()) {
			hScan.status();
			//System.out.println(row);
		}
	}
}

HPut

HPut和HScan,HGet不太一样,因为put操作的数据源可以有多个,而且类型可以任意(从文本,数据库,HBase表,流),所以对数据源线程也进行了封装,数据源线程数由用户决定。

  1. public HPut(String table, int threadNum) threadNum是写入线程的数目。
  2. public void addTask(final Callable<HashMap<String, String>> task) 返回一行记录的接口,每调用一次产生一个新的数据源线程,当返回null的时候数据源线程结束。
  3. public void enableStatus(boolean enable),即使是一个近似的状态也需要在合适的地方显示,用户程序中显示基本上误差非常大,所以放到put操作的时候显示,这个函数用来控制是否显示put状态,默认显示。

Demo,只展示一个数据源的情况,多数据源没有测试(理论上应该没问题)。从gm_player_detail读取一天的数据插入到hbase_test表。

package joyport.hbase.gm;

import java.util.HashMap;
import java.util.concurrent.Callable;

public class Test extends Thread implements Callable<HashMap<String, String>> {
	private int threadNum = 30;
	private int taskNum = 1000;
	private String htable = "gm_player_detail";
	private HScan hScan;

	public static void main(String[] args) throws Exception {
		int[] time = new int[2];
		if (args.length == 2) {
			time = Util.getTimeScale(args[0], args[1]);
		} else if (args.length == 1) {
			time = Util.getTimeScale(-1);
			time[0] = Util.getTimeScale(args[0], args[0])[0];
		} else {
			time = Util.getTimeScale(-1);
		}
		Test t1 = new Test(time[0], time[1]);
		// 往hbase_test写数据
		HPut hPut = new HPut("hbase_test", 10);
		hPut.addTask(t1);
		hPut.fuck();
	}

	public Test(int startkey, int endkey) throws Exception {
		String[] cols = null;
		hScan = new HScan(htable, cols, threadNum);
		hScan.addTask(String.valueOf(startkey), String.valueOf(endkey), taskNum);
		hScan.fuck();
	}

	public HashMap<String, String> call() throws InterruptedException {
		return hScan.fetch();
	}
}

HDelete

delete操作只需要rowkey就可以,但是缓冲区数据结构是死的,所以也使用HashMap<String,String>格式,以rowKey为键。如果row.isEmpty()为真则这条数据跳过。

Demo,从hbase_test读取所有行,并删除所有行。

package joyport.hbase.gm;

import java.util.HashMap;
import java.util.concurrent.Callable;

public class Test implements Callable<HashMap<String, String>> {
	private static int threadNum = 30;
	private static String htable = "hbase_test";
	private static HScan hScan;

	public static void main(String[] args) throws Exception {
		String[] cols = {};
		hScan = new HScan(htable, cols, threadNum);
		hScan.addTask("0", "2", 1);
		hScan.fuck();

		HDelete h = new HDelete(htable, 10);
		h.addTask(new Test());
		h.fuck();
	}

	public HashMap<String, String> call() throws Exception {
		return hScan.fetch();
	}
}

 

需要注意的问题:

  • 默认缓冲区大小是10000,外部程序线程数(不是构造方法中定义的那个并发)不能超过这个值,否则可能导致无限wait()。
  • 目前HScan的任务池没有进行排重处理,也就是两个任务的起止rowkey交叉和相同任务重复没有进行内部处理,为避免重复扫描需要外部保证。HGet任务排重也需要外部保证。
  • HScan和HGet任务必须一次性添加完毕,因为threadStart()一旦判断完成将关闭缓冲区而造成任务丢失。
  • 本类只在Java层面进行多线程操作!HBaseClient内部同步机制不在本类管辖范围之内!
  • Config.java用到了同目录下的config.conf文件,配置zookeepers和port应该就能用了。

可能的改进:

  • HBuffer的基本类型是HashMap<String,String>,分别是列名和值。如果把HashMap<String,String>改成数组只保存值,新建一个HRow替换HashMap<String,String>作为一行数据,列名作为HRow的静态属性只保存一份,HRow维护列名和值的对应关系,这样会省下大量列名重复占据的内存并降低GC的负担,在大数据量内存运算的情况下能有效提高内存使用率。HRow维护对应关系占用的CPU完全可以忽略。
  • HBase读线程和程序读线程公用一个缓冲区,导致生产和消费使用同一个对象监视器,这样的话notify()效率不是最优,在速度很不稳定的情况下效率可能会有非常小幅度的降低,目前没有解决办法。

hbase-0.92.1

HBaseMulti打包下载

Config

Java HBase 多线程》上有5条评论

  1. Pingback引用通告: HBase多线程SCan,Get,Put,Delete | 乐都网技术团队博客

发表评论

电子邮件地址不会被公开。

*