As usual, I’m not creating a virtualenv and installing the libraries manually afterwards with pip. Nowadays for every new Python project I’m using Poetry. It was one of the things I was missing the most when I started doing Python stuff coming from the Java world. According to the documentation Poetry is a tool for dependency management and packaging in Python. It allows you to declare the libraries your project depends on and it will manage (install/update) them for you.
Hbase series: Python clients
Well so I’ve got a Hbase database ready to go. Lets add some data through my brand new Python application. I need a client library to connect to Hbase.
Happybase
Happybase is a Python Hbase client. I think that most important feature of Happybase is simplicity specially when comparing it with the Java client library, which is more of a low level library. Happybase makes dealing with Hbase as if you were using a dictionary (or a map). We’ll see that in a moment.
To install it the pip dependency is happybase
.
Before doing anything we need to make sure the Hbase server is running, and also that the Hbase Thrift server is up as well. Specially when you are using a standalone Hbase installation, the thrift server is disabled by default.
mario@computer> cd hbase
mario@computer> ./bin/start-hbase.sh
...
mario@computer> ./bin/hbase-daemon.sh start thrift
running thrift, logging to /home/mario/../logs/hbase-mario-thrift-mario-computer.out
Now we can create our first table using Happybase. The table iot
is going to register temperature and humidity from different devices located inside a house. This table will have two different column families:
-
device will store device attributes such as ip, logical name…
-
metric will store device temperature and humidity values
Using HappyBase API you can set column families attributes, for example, which compression algorithm to use for a specific column family. Here I’m using GZ
compression for the metric column family:
import random
import happybase
import datetime as dt
def hbase_create():
connection = happybase.Connection('localhost')
connection.create_table(
'iot',
{
'device': dict(),
'metric': dict({
"COMPRESSION": "GZ"
})
})
Once I’ve created the table, I can start adding rows to it. The following example adds 5M rows to Hbase. Instead of writing every row one by one I’m inserting rows in batches to improve performance. Every rowkey will be built out of:
-
timestamp: 20210818100000 (YYYMMddHHmmSS)
-
house id: 001 (there are 100 houses)
-
device id: 01 (there are 05 devices per house)
So an example of a row key could be 20210818100000-001-01 which is all the information of the device 01 of the house 001 at a specific point in time 20210818100000.
import random
import happybase
import datetime as dt
def hbase_put():
connection = happybase.Connection('localhost')
iot_table = connection.table("iot")
initial_dt = dt.datetime.now()
with iot_table.batch(batch_size=100) as batch: # (1)
for second in range(0, 10000):
for house in range(0, 100):
for device in range(0, 5):
moment = (initial_dt + dt.timedelta(seconds=second)).strftime('%Y%m%d%H%M%S')
rowkey = bytes("{0}-{1:03}-{2:02}".format(moment, house, device), 'UTF-8') # (2)
dev_ip = "192.168.1.14{}".format(device)
dev_type = str(device)
dev_temp = str(random.randint(20, 30))
dev_humi = str(random.randint(30, 40) / 100)
batch.put(rowkey, { # (3)
b"device:ip": bytes(dev_ip, 'UTF-8'),
b"device:type": bytes(dev_type, 'UTF-8'),
b"metric:temp": bytes(dev_temp, 'UTF-8'),
b"metric:humidity": bytes(dev_humi, 'UTF-8')
})
1 | Using batching mechanism to improve performance when writing to Hbase |
2 | building a rowkey with timestamp-houseid-deviceid , e.g: 20210818000000-001-01 |
3 | Inserts a series of cells under the same rowkey |
Now that I have some data, lets find all devices' ip, type, and temperature of house 001 with temperature greater than 25 degrees celsius. In Hbase is always, always better to query by using a rowkey range rather than querying using filters, or at least use a filter once you’ve narrow down the area of the search by applying a rowkey range.
In HappyBase you query using scan method, you can narrow down rowkey range via row_start and row_stop, and if you would like to apply a filter then use the filter attribute.
import random
import happybase
import datetime as dt
def hbase_scan():
connection = happybase.Connection('localhost')
iot_table = connection.table("iot")
initial_dt = dt.datetime.now()
initial_dt_str = initial_dt.strftime('%Y%m%d%H%M%S')
row_key_from = "{}-001-00".format(initial_dt_str)
row_key_to = "{}-001-05".format(initial_dt_str)
results = iot_table.scan(
limit=100,
row_start=bytes(row_key_from, 'UTF-8'),
row_stop=bytes(row_key_to, 'UTF-8'),
columns=[b"device:ip", b"device:type", b"metric:temp"],
filter=b"SingleColumnValueFilter('metric', 'temp', >, 'binary:25')"
)
for row in results:
print(row)
This is not meant to be an exhaustive guide of using HappyBase, specially because the HappyBase use guide is full of examples. That’s why if you’d like to go deeper I’d recommend you to visit the HappyBase readthedocs.
Conclusion and Gotchas
In general HappyBase is a very friendly library to start using with Hbase. However there are pitfalls I’ve found myself doing and I think are worth mentioning:
-
Make sure the Hbase Thrift server is up and running before executing any HappyBase client
-
Because Hbase only knows about bytes, bytes is what HappyBase expects as values
-
Batching operations require you to either explicitly call to batch.send() or to establish a batch_size when calling to table.batch(batch_size=128) otherwise HappyBase will be storing rows in memory until the with scope has ended.
-
Filters are nice but they should be only be applied once you have set a good row boundary, otherwise you will be scanning through the whole database.
AIOHappyBase
There are use cases where you may benefit from asynchronous query execution. This kind of scenario is what AIOHappyBase tries to solve, it was born as a necessity to take HappyBase to the asynchronous arena.
Lets imagine you have to execute 3 queries and one of them takes longer than the other two. In the following example I’m creating three tasks with Python’s asyncio and AIOHappyBase. Because the heavy operations takes longer thanks to asyncio and HappyBase the two other tasks are not blocked and can be executed in the meantime.
async def heavy_operation_aio():
async with ConnectionPool(host="localhost", size=10) as pool:
heavy_1_task = asyncio.Task(heavy_operation_1(pool))
light_1_task = asyncio.Task(light_op_1(pool))
light_2_task = asyncio.Task(light_op_2(pool))
heavy_res, light_1_res, light_2_res = await asyncio.gather(
heavy_1_task,
light_1_task,
light_2_task)
print_results("HEAVY", heavy_res)
print_results("LIGHT (1)", light_1_res)
print_results("LIGHT (2)", light_2_res)
It’s clearly shown in the output.
(hbasegs-MTmhc8Pq-py3.9) mario@computer:~/hbasegs$ poetry run async_search
light_op_1 executing!
light_op_2 executing!
heavy_operation_1 executing!
HEAVY RESULTS
=============
(b'20210819113524-001-00', {b'device:ip': b'192.168.1.140', b'device:type': b'0', b'metric:temp': b'29'})
LIGHT (1) RESULTS
=================
(b'20210819113522-001-00', {b'device:ip': b'192.168.1.140', b'device:type': b'0', b'metric:temp': b'28'})
LIGHT (2) RESULTS
=================
(b'20210819113522-001-00', {b'device:ip': b'192.168.1.140', b'device:type': b'0', b'metric:temp': b'28'})
secs: 2
You can find the full source code at Github.
Resources
-
Source Code At Github: source code used in this entry
-
Python’s AsyncIO Documentation: Official Python 3 AsyncIO documentation
-
AsyncIO Tutorial: An introduction to AsyncIO tutorial
-
Cloudera Hbase Filter reference: Good place to know how to build the different filters Hbase supports in its queries.
-
Coco Material: All drawings in this entry are from this wonderful site!