在每个Apache Spark工作节点上创建一个Java HBase客户端实例。

Creating an instance of java hbase client on each Apache Spark worker node


Working with Spark Structured Streaming.

I am working on a code where I need to do a lot of lookups on data. Lookups are very complex and just don't translate too well to joins.

e.g. look up field A in Table B and fetch a value, if found lookup that values in another table. if not found lookup some other value C in table D and then so on and so forth.

I managed to write these lookups using HBase and it works fine, functionally.
I wrote udfs for each of these lookups e.g. a very simple one might be:

val someColFunc= udf( (code:String) =>
			val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
			if (value != null)

Since java hbase client is non serializable. I am passing Hbase object like this

object HbaseObject {
 val table = new HbaseUtilities(zkUrl)

HbaseUtilities is a class I wrote to simplify lookups. It just creates a java HBase client and provides a wrapper for the kind of get commands I need.

This is rendering my code too slow, which too, is alright. What's puzzling me, is that increasing or decreasing the number of executors or cores is having no effect on the speed of my code. be it 1 executor or 30 it's running at the exact same rate. Which makes me believe there is lack of parallelism. So all my workers must be sharing the same Hbase object. Is their a way I can instantiate one such object on each worker before they start executing?
I have already tried using lazy val, it's not having any effect

I have even tried creating a sharedSingleton as shown here https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/, which solved some problems for me but not the loss of parallelism.

I know there might be better ways to solve the problem and all suggestions are very welcome but right now I'm caught in a few constraints and a tight timeline.


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}

partition => {
  val config: Configuration = HBaseConfiguration.create()
  config.set("hbase.zookeeper.quorum", "zk url")
  val connection: Connection = ConnectionFactory.createConnection(config)
  val table = connection.getTable(TableName.valueOf("tableName"))
    record => {
      val byteKey = Bytes.toBytes(record.getString(0))
      val get = new Get(byteKey)
      val result = table.get(get)




You need to create all non serializable objects in the executor.
you can use foreachPartition or mapPartitions to create a connection in each executor.

Something similar to this (i'm using hbase client 2.0.0):

 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}

partition => {
  //foreach executor create the connection and the table
  val config: Configuration = HBaseConfiguration.create()
  config.set("hbase.zookeeper.quorum", "zk url")
  val connection: Connection = ConnectionFactory.createConnection(config)
  val table = connection.getTable(TableName.valueOf("tableName"))
    record => {
      val byteKey = Bytes.toBytes(record.getString(0))
      val get = new Get(byteKey)
      val result = table.get(get)

df is the dataframe for each record you want to do the lookup.

You can create as many tables you need for each executor for the same connection.

As you create all the objects in executors you don't need to deal with non serializable problems. You can have it in a class like your HbaseUtilities to be used there but you need to create a new instance only inside the foreach/map partitions


你可以通过使用HBase项目的主分支中的HBase-Spark Connector来实现你正在尝试的操作。出于某种原因,连接器似乎没有包含在任何官方的HBase构建中,但你可以自己构建它,它可以正常工作。只需构建该JAR文件并将其包含在你的pom.xml中。



JavaSparkContext jSPContext ...; // 创建Java Spark Context
HBaseConfiguration hbConf = HBaseConfiguration.create();
hbConf.set("hbase.zookeeper.quorum", zkQuorum);
hbConf.set("hbase.zookeeper.property.clientPort", PORT_NUM);
// 这是你从Spark访问HBase的关键链接 - 每次在Spark并行性内访问HBase时都要使用它:
JavaHBaseContext hBaseContext = new JavaHBaseContext(jSPContext, hbConf);

// 创建一个RDD并将其与HBase访问并行化:
JavaRDD<String> myRDD = ... // 创建你的RDD
hBaseContext.foreachPartition(myRDD, new SparkHBaseWorker());
// 你还可以执行其他常见的Spark任务,如mapPartitions,forEach等。

// 用于RDD类型为String的foreachPartition用例的Spark Worker类可能如下所示:
class SparkHBaseWorker implements VoidFunction<Tuple2<Iterator<String>, Connection>>
    private static final long serialVersionUID = 1L;

    public WorkerIngest()

    // 将所有HBase逻辑放入此函数中:
    public void call(Tuple2<Iterator<String>, Connection> t) throws Exception
        // 这是你的HBase连接对象:
        Connection conn = t._2();
        // 现在你可以从此Spark worker节点直接访问HBase:
        Table hbTable = conn.getTable(TableName.valueOf(MY_TABLE));
        // 现在可以对表格等进行操作。



You can accomplish what you are trying to do by using the HBase-Spark Connector from the main branch of the HBase project. For some reason the connector doesn't seem to be included in any official HBase builds, but you can build it yourself and it works fine. Just build the jar and include it in your pom.xml.

Once built, the connector will allow you to pass the HBase Connection object inside the Worker class, so you don't have to worry about serializing the connection or building singletons/etc.

For example:

JavaSparkContext jSPContext ...; //Create Java Spark Context
HBaseConfiguration hbConf = HBaseConfiguration.create();
hbConf.set(&quot;hbase.zookeeper.quorum&quot;, zkQuorum);
hbConf.set(&quot;hbase.zookeeper.property.clientPort&quot;, PORT_NUM);
// this is your key link to HBase from Spark -- use it every time you need to access HBase inside the Spark parallelism:
JavaHBaseContext hBaseContext = new JavaHBaseContext(jSPContext, hbConf);	

// Create an RDD and parallelize it with HBase access:
JavaRDD&lt;String&gt; myRDD = ... //create your RDD
hBaseContext.foreachPartition(myRDD,  new SparkHBaseWorker());
// You can also do other usual Spark tasks, like mapPartitions, forEach, etc.

// The Spark worker class for foreachPartition use-case on RDD of type String would look something like this:
class SparkHBaseWorker implements VoidFunction&lt;Tuple2&lt;Iterator&lt;String&gt;, Connection&gt;&gt;
	private static final long serialVersionUID = 1L;
	public WorkerIngest()
// Put all your HBase logic into this function:
	public void call(Tuple2&lt;Iterator&lt;String&gt;, Connection&gt; t) throws Exception
		// This is your HBase connection object:
		Connection conn = t._2();
		// Now you can do direct access to HBase from this Spark worker node:
		Table hbTable = conn.getTable(TableName.valueOf(MY_TABLE));
		// now do something with the table/etc.

