Statement
We have 10 machines HBase cluster and billions of rows inside. Every row consists of one column family and ~20 columns. We need perform frequent scan requests which contains start row prefix and end row prefix. Usually every scan returns abount 100 - 10000 rows.
Because requests can come very often (up to several requests per minute), so performance is preoritized. Due to system's architecture we want to realize our solution in Python instead of current Java code. The problem is with Python we obtain 5x-10x worse performance than in Java.
What now works
We have Java code which perform scan requests to HBase. It uses ususal HBase Java API:
public List<String> getNumber(Number key) {
List<String> res = new ArrayList<>();
String start_key = key.getNumber();
String next_key = key.getNumber() + "1";
byte[] prefix_begin = Bytes.toBytes(start_key);
byte[] prefix_end = Bytes.toBytes(next_key);
Scan scan = new Scan(prefix_begin, prefix_end);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
byte[] row = result.getRow();
res.add(Bytes.toString(row));
}
return res;
}
These queries parallelized with the help of Callable
interface and ScheduledThreadPoolExecutor
. The call()
method of every callable just run getNumber(Number key)
.
public List<String> getNumbers(List<Number> keys) {
List<String> res = new ArrayList<String>();
List<Callables.CallingCallable> callables = new ArrayList();
for (Number source : keys) {
callables.add(new Callables.CallingCallable(this, source));
}
Object futures = new ArrayList();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(24);
try {
futures = executor.invokeAll(callables);
} catch (InterruptedException ex) {
}
executor.shutdown();
}
This works pretty good and allows achieve following performance:
- 1.5 - 2.0 sec per single scan and
- 5.0 - 8.0 sec per 100 parallelized scans
What we try
We try to implement similar solution in Python with the help of Happybase library:
@staticmethod
def execute_query(key, table_name, con_pool):
items = []
with con_pool.connection() as connection:
table = happybase.Table(table_name, connection)
[row_start, row_end] = get_start_and_end_row(key)
selected_rows = table.scan(row_start=row_start, row_stop=row_end)
for key, data in selected_rows:
items.append(Item(data))
return items
@staticmethod
def execute_in_parallel(table_name, hbase_host, hbase_port, keys):
pool = ThreadPool(24)
con_pool = happybase.ConnectionPool(size=24, host=hbase_host, port=hbase_port)
execute_query_partial = partial(execute_query, table_name=table_name, con_pool=con_pool)
result_info = pool.map_async(execute_query_partial, keys, chunksize=1)
result = result_info.get()
Achieved performance:
- 2.0 - 3.0 sec per single scan and
- 30 - 55 sec per 100 parallelized scans
As we can see performance of single scan is very similar. But parallelized tasks in Python are much slower.
Any ideas why does it happen? Maybe some issues with our Python/Happybase code? Or performance of HBase Thrift server (which HappyBase uses to connect to HBase)?