Java HBase 多线程

github: https://github.com/ares333/HbaseMultiThread

Java多线程是一个很麻烦的东西,为了简化开发提高性能用Java的多线程封装了HBase API,包括Scan,Get,Put,Delete四种操作。经过多次修改运行非常稳定,已经用于生产环境。内部线程通信使用wait()/notify()机制,效率非常高。本文只在Java层面讨论HBase的多线程,HBase API内部线程处理机制不在本文讨论范围之内。HBase客户端版本 0.92.1,本类源码结尾处可以下载。下面通过demo演示一下用法,结尾处有代码打包下载。

特点:

  • 从Java层面用多线程最大化了读写性能。
  • 大量减少了应用程序的代码量,现在可以集中精力到数据分析上了。
  • 性能平均下来至少比单线程高出1个数量级,某些情况下高出两个数量级。

所有返回数据都是HashMap<String,String>,代表hbase中一行的一个列簇,数据中必然有一个键__rowkey保存本行的rowkey,所有列簇默认为cf,可以使用setCf()提前设置。
文档和最新版本可能有细微出入,不再修改文档。

HScan

HScan有5个public方法:

  1. public HScan(String table, String[] columns, int threadNum),table指定表名,columns指定扫描的列,如果设为null代表扫描所有的列,如果设为空数组代表只扫描rowkey,threadNum设置扫描器的并发数。
  2. public void addTask(String start, String end, int num),前两个参数是起止rowkey,第三个参数用于控制rowkey平分成num段,这个参数主用用于可整型计算的时间戳开头的rowkey,如果非整型范围内数字这个参数设为1即可。这个方法可以调用多次,每次添加num个任务到任务池。
  3. public void fuck(),继承来的方法,开始进行扫描。
  4. public HashMap<String, String> fetch(),从缓冲区里取一个结果出来,结果中的键由列名和"rowKey"组成,如果返回null表示所有行都已经读完了。
  5. public void status(),一个非同步方法,用于近似显示线程的运行状态,总是在当前行显示,内容由4部分组成2013-01-28_103053,第一列显示已经返回的结果数,第二列显示缓冲区中的行数,第三列分别表示 当前并发线程数/Thread.State.WAITING数/Thread.State.BLOCKED数,第四列显示任务池中的任务数。

Demo,扫描gm_player_detail表2013年1月1日到10日的所有数据

package joyport.hbase.gm;

import java.util.HashMap;

public class Test {
	private static int threadNum = 30;
	private static int taskNum = 1000;
	private static String htable = "gm_player_detail";

	public static void main(String[] args) throws Exception {
		int[] time = new int[2];
		if (args.length == 2) {
			time = Util.getTimeScale(args[0], args[1]);
		} else if (args.length == 1) {
			time = Util.getTimeScale(-1);
			time[0] = Util.getTimeScale(args[0], args[0])[0];
		} else {
			time = Util.getTimeScale(-1);
		}
		HScan hScan = new HScan(htable, null, threadNum);
		hScan.addTask(String.valueOf(time[0]), String.valueOf(time[1]), taskNum);
		hScan.fuck();
		Test test = new Test();
		test.analyse(hScan);
	}

	public void analyse(HScan hScan) throws InterruptedException {
		HashMap<String, String> row = null;
		for (row = hScan.fetch(); row != null; row = hScan.fetch()) {
			hScan.status();
			//System.out.println(row);
		}
	}
}

Util.getTimeScale(-1)获取昨天的起止时间戳
Util.getTimeScale(args[0], args[1])根据yyyy:mm:dd格式的日期获取起止时间戳,java用的是微秒计算的时候可能跨天,如果总是用这个函数就不会出现时间断裂或重复。
编译然后运行
java joyport.hbase/gm/Test 2013-01-01 2013-01-10

HGet

get操作还是很有用途的,比如有些需求需要这样来做,统计每一天的前n条记录,但是一条记录字段特别多,可以先只Scan必要字段计算出前n,然后根据rowkey再回去get详细数据,比直接在详细数据上计算性能高多了。因为需要get的rowkey都是放在内存的,所以如果需要大量get需要自己控制内存使用率。public 方法和HScan类似,自己看代码就明白。

Demo,只扫描rowkey然后根据rowkey用get获取所有列。

package joyport.hbase.gm;

import java.util.HashMap;

public class Test {
	private static int threadNum = 30;
	private static int taskNum = 1000;
	private static String htable = "gm_player_detail";

	public static void main(String[] args) throws Exception {
		int[] time = new int[2];
		if (args.length == 2) {
			time = Util.getTimeScale(args[0], args[1]);
		} else if (args.length == 1) {
			time = Util.getTimeScale(-1);
			time[0] = Util.getTimeScale(args[0], args[0])[0];
		} else {
			time = Util.getTimeScale(-1);
		}
		String[] cols = {};
		HScan hScan = new HScan(htable, cols, threadNum);
		hScan.addTask(String.valueOf(time[0]), String.valueOf(time[1]), taskNum);
		hScan.fuck();
		Test test = new Test();
		test.analyse(hScan);
	}

	public void analyse(HScan hScan) throws Exception {
		HashMap<String, String> row = null;
		HGet hGet = new HGet(htable, null, 30);
		for (row = hScan.fetch(); row != null; row = hScan.fetch()) {
			hScan.status();
			//System.out.println(row);
			hGet.addTask(row.get("rowKey"));
		}
		hGet.fuck();
		for (row = hGet.fetch(); row != null; row = hGet.fetch()) {
			hScan.status();
			//System.out.println(row);
		}
	}
}

HPut

HPut和HScan,HGet不太一样,因为put操作的数据源可以有多个,而且类型可以任意(从文本,数据库,HBase表,流),所以对数据源线程也进行了封装,数据源线程数由用户决定。

  1. public HPut(String table, int threadNum) threadNum是写入线程的数目。
  2. public void addTask(final Callable<HashMap<String, String>> task) 返回一行记录的接口,每调用一次产生一个新的数据源线程,当返回null的时候数据源线程结束。
  3. public void enableStatus(boolean enable),即使是一个近似的状态也需要在合适的地方显示,用户程序中显示基本上误差非常大,所以放到put操作的时候显示,这个函数用来控制是否显示put状态,默认显示。

Demo,只展示一个数据源的情况,多数据源没有测试(理论上应该没问题)。从gm_player_detail读取一天的数据插入到hbase_test表。

package joyport.hbase.gm;

import java.util.HashMap;
import java.util.concurrent.Callable;

public class Test extends Thread implements Callable<HashMap<String, String>> {
	private int threadNum = 30;
	private int taskNum = 1000;
	private String htable = "gm_player_detail";
	private HScan hScan;

	public static void main(String[] args) throws Exception {
		int[] time = new int[2];
		if (args.length == 2) {
			time = Util.getTimeScale(args[0], args[1]);
		} else if (args.length == 1) {
			time = Util.getTimeScale(-1);
			time[0] = Util.getTimeScale(args[0], args[0])[0];
		} else {
			time = Util.getTimeScale(-1);
		}
		Test t1 = new Test(time[0], time[1]);
		// 往hbase_test写数据
		HPut hPut = new HPut("hbase_test", 10);
		hPut.addTask(t1);
		hPut.fuck();
	}

	public Test(int startkey, int endkey) throws Exception {
		String[] cols = null;
		hScan = new HScan(htable, cols, threadNum);
		hScan.addTask(String.valueOf(startkey), String.valueOf(endkey), taskNum);
		hScan.fuck();
	}

	public HashMap<String, String> call() throws InterruptedException {
		return hScan.fetch();
	}
}

HDelete

delete操作只需要rowkey就可以,但是缓冲区数据结构是死的,所以也使用HashMap<String,String>格式,以rowKey为键。如果row.isEmpty()为真则这条数据跳过。

Demo,从hbase_test读取所有行,并删除所有行。

package joyport.hbase.gm;

import java.util.HashMap;
import java.util.concurrent.Callable;

public class Test implements Callable<HashMap<String, String>> {
	private static int threadNum = 30;
	private static String htable = "hbase_test";
	private static HScan hScan;

	public static void main(String[] args) throws Exception {
		String[] cols = {};
		hScan = new HScan(htable, cols, threadNum);
		hScan.addTask("0", "2", 1);
		hScan.fuck();

		HDelete h = new HDelete(htable, 10);
		h.addTask(new Test());
		h.fuck();
	}

	public HashMap<String, String> call() throws Exception {
		return hScan.fetch();
	}
}

 

需要注意的问题:

  • 默认缓冲区大小是10000,外部程序线程数(不是构造方法中定义的那个并发)不能超过这个值,否则可能导致无限wait()。
  • 目前HScan的任务池没有进行排重处理,也就是两个任务的起止rowkey交叉和相同任务重复没有进行内部处理,为避免重复扫描需要外部保证。HGet任务排重也需要外部保证。
  • HScan和HGet任务必须一次性添加完毕,因为threadStart()一旦判断完成将关闭缓冲区而造成任务丢失。
  • 本类只在Java层面进行多线程操作!HBaseClient内部同步机制不在本类管辖范围之内!
  • Config.java用到了同目录下的config.conf文件,配置zookeepers和port应该就能用了。

可能的改进:

  • HBuffer的基本类型是HashMap<String,String>,分别是列名和值。如果把HashMap<String,String>改成数组只保存值,新建一个HRow替换HashMap<String,String>作为一行数据,列名作为HRow的静态属性只保存一份,HRow维护列名和值的对应关系,这样会省下大量列名重复占据的内存并降低GC的负担,在大数据量内存运算的情况下能有效提高内存使用率。HRow维护对应关系占用的CPU完全可以忽略。
  • HBase读线程和程序读线程公用一个缓冲区,导致生产和消费使用同一个对象监视器,这样的话notify()效率不是最优,在速度很不稳定的情况下效率可能会有非常小幅度的降低,目前没有解决办法。

hbase-0.92.1

HBaseMulti打包下载

Config

14 thoughts on “Java HBase 多线程

  1. I would like to know, if this code performs one task using multiple threads or many tasks concurrently using multiple threads. (eg. suppose we want to simulate 10 different clients scan a particular hbase table, so in that case, does HScan.java corresponds to performing 10 scans by 10 clients, or one clients uses many threads to perform one single task ?). Please do clarify. Thanks. (as of my understanding, it is one task by many threads)

    • 楼主您好,我有个疑惑,望能解答:
      按照Hput 的demo中的例子,addTask方法中this.taskPool.add(thread); 然后启动线程,线程中push之后,将HPut.this.taskPool.remove(this); 那么现在taskPool中已经为空,之后条用Hput的threadStart()方法时,先判断if (!taskPool.isEmpty()) 然后再开始启动put的处理过程,那taskPool都已经为空了,根本启动不了put的过程,何谈put?

    • 已经4年多不搞java了,这个当时应该是为了解决另一个问题,看了一下代码在数据源数据不够填满缓冲区的情况下会有概率出你说的问题,读写线程应该在一个地方启动比较合适。没想到还有人看这个?,真是非常吃惊,这个是HBase 0.92为了提升效率开发的,现在不知道有没有官方功能。有兴趣可以去github上改代码。

      • 感谢楼主的回复,最近在看多线程写hbase,您这个提到生产环境下比较稳定,所以就研究了一下,有个地方不是很明白,楼主有时间可否指点我一下:
        问题一:
        HPut的demo中,threadNum = 30,taskNum = 1000,按我的理解是threadStart()中开了30个线程去process()方法,那这样只处理30个task,丢失70个task。但实际运行时,我发现执行process()方法的次数与taskNum相同,请问是我哪里理解错了?
        问题二:
        之所以在排查一些原因,是因为我用上文中Hput的demo插数据到另一个表中的时候会丢数据,而且每次试验丢的数据量不一致,还没排查出是哪里的原因,楼主有亲测过吗?
        为难您了!谢谢!

        • 1.threadNum=30是线程数,每个线程读取hbase表的一段连续数据,taskNum=1000用途是把要读取的数据平分成1000份然后往线程中塞。

          2.HPut当时用的比较少,这个就帮不了你了。

          • “threadNum=30是线程数,每个线程读取hbase表的一段连续数据,taskNum=1000用途是把要读取的数据平分成1000份然后往线程中塞。”
            这个我明白,我的意思是,threadNum=30,每个线程读取hbase表的一段连续数据,那不就只能读取30段数据了吗?但数据是有1000段。我只看到了一个线程读取一段数据就结束了,没看到代码哪里有 “这个线程读完一段数据再去读另一段,直到这30个线程读完1000段” 的逻辑。请楼主指点一下,感谢!

            • 又过了一遍代码,一开始启动threadNum=30个,每个线程读完自己的那一段会另外启动一个新的线程从taskNum=1000中取剩余的段,HBaseMulti.java 中 abstract class HBaseThread 内部抽象类干这件事情,178,179行是实现。

  2. Pingback引用通告: HBase多线程SCan,Get,Put,Delete | 乐都网技术团队博客

发表评论

电子邮件地址不会被公开。

*