We have 10 machines HBase cluster and billions of rows inside. Every row consists of one column family and ~20 columns. We need perform frequent scan requests which contains start row prefix and end row prefix. Usually every scan returns abount 100 - 10000 rows.
Because requests can come very often (up to several requests per minute), so performance is preoritized. Due to system's architecture we want to realize our solution in Python instead of current Java code. The problem is with Python we obtain 5x-10x worse performance than in Java.
We have Java code which perform scan requests to HBase. It uses ususal HBase Java API:
public List<String> getNumber(Number key) {
List<String> res = new ArrayList<>();
String start_key = key.getNumber();
String next_key = key.getNumber() + "1";
byte[] prefix_begin = Bytes.toBytes(start_key);
byte[] prefix_end = Bytes.toBytes(next_key);
Scan scan = new Scan(prefix_begin, prefix_end);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
byte[] row = result.getRow();
res.add(Bytes.toString(row));
}
return res;
}
These queries parallelized with the help of Callable
interface and ScheduledThreadPoolExecutor
. The call()
method of every callable just run getNumber(Number key)
.
public List<String> getNumbers(List<Number> keys) {
List<String> res = new ArrayList<String>();
List<Callables.CallingCallable> callables = new ArrayList();
for (Number source : keys) {
callables.add(new Callables.CallingCallable(this, source));
}
Object futures = new ArrayList();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(24);
try {
futures = executor.invokeAll(callables);
} catch (InterruptedException ex) {
}
executor.shutdown();
}
This works pretty good and allows achieve following performance:
We try to implement similar solution in Python with the help of Happybase library:
@staticmethod
def execute_query(key, table_name, con_pool):
items = []
with con_pool.connection() as connection:
table = happybase.Table(table_name, connection)
[row_start, row_end] = get_start_and_end_row(key)
selected_rows = table.scan(row_start=row_start, row_stop=row_end)
for key, data in selected_rows:
items.append(Item(data))
return items
@staticmethod
def execute_in_parallel(table_name, hbase_host, hbase_port, keys):
pool = ThreadPool(24)
con_pool = happybase.ConnectionPool(size=24, host=hbase_host, port=hbase_port)
execute_query_partial = partial(execute_query, table_name=table_name, con_pool=con_pool)
result_info = pool.map_async(execute_query_partial, keys, chunksize=1)
result = result_info.get()
Achieved performance:
As we can see performance of single scan is very similar. But parallelized tasks in Python are much slower.
Any ideas why does it happen? Maybe some issues with our Python/Happybase code? Or performance of HBase Thrift server (which HappyBase uses to connect to HBase)?
There is a way by using Jython which allows you to access the java JVM and java libraries. With this you can write python and java in the same source file. Then the code is compiled into java bytecode for the JVM. This should give the same performance as Jython is written in java code and so you won't have to write in pure java.
Java benchmark vs Python is much higher. Here is a website that shows performance between java and python.
http://benchmarksgame.alioth.debian.org/u64q/python.html
and here is the website for jython: http://www.jython.org/