I am pretty new to using Event hubs and am stuck of this problem.
We are streaming data of format xml to eventhub. Now, incase the size of xml is more than 1 MB, we will compress the xml's as eventhub cannot consume Larger than 1 MB data.
MAX_BYTES=1000000
## events is a List of XML Strings
for event in events:
event_data_batch = eh_client.create_batch(max_size_in_bytes=MAX_BYTES)
if sys.getsizeof(event) >= MAX_BYTES:
event = gzip.compress(event)
event_data_batch.add(EventData(event))
eh_client.send_batch(event_data_batch)
Till this everything works, The problem is at receivers end. Now I want to consume the data from Eventhub. i.e Decompress the data and get the xml string that i sent earlier.
I am using below snippet, but it fails as message.body is of type Generator and it cannot be decompressed Also Tried,
- message variable which is in below snippet is object of class Event Data so it cannot be used directly to decompress.
- message.body_as_str(encoding='UTF-8') also is throwing error data inside event data is of type bytes and not string.
message: EventData
for message in messages:
# message is of type EventDATA
try:
message_body = message.body_as_str().encode('utf-8')
# If we receive compressed message, this will fail and we will got to except block
except:
self.log.info(f'Failed to extract Message Body, Checking if message is compressed')
message_body = gzip.decompress(message.body)
What can I do ? How can I consume the compressed data from eventhub for further processing?