The situation:
I have a large amount of data from a social media website encompassing 5 months, separated by minutes of line formatted json files. See here for explanation
All data is on a server with almost 400 gb of ram and a 32 core, 64 thread cpu. I have been taking advantage of this with python's multiprocessing module, however, I basically have to dedicate 29 processes to parsing and the rest to extracting, writing, and reading/deleting. The performance doesn't seem to get much better than my current configuration.
I have created a program with 5 main processes that extend the python multiprocessing module. There is a process for each step:
extracting
: extract tar files encompassing 1 day of 1 minute json.bz2 files.
reading
: read directly from bz2 files, send json dicts to parsing queue
send message to delete module when finished with tar file.
deleting
: deletes old tar files so as to not use as much space on the server
parsing
: parses json dicts for keywords to categorize, flattens them, puts them
into dataframe, and sends to write queue.
write
: merges dataframes from write queue according to category. when master
dataframe is at a certain size, write to output directory and discard.
The problem:
It takes way too long to get the data. If I ran the program as it is now without stopping, it will take over 20 weeks to parse the data that I actually want and separate it into 4 broad categories that I put into dataframes and write to hdf5 files.
What I have tried:
I have tried getting the json, putting it into a dictionary, line by line, like so:
with open('file') as f:
for line in f:
json.loads(line)
# send data to queue for parse processes
I have also tried to convert to another datatype to take some overhead off the parse process:
with open('file') as f:
for line in f:
json.loads(line)
# categorize and convert to flattened dataframes and send
# to parse process
I have also tried other things, like using pandas Series or Lists instead of python dicts, but that doesn't seem to help either.
current pseudocode for main parse process:
while True:
while parse queue empty: wait 1 second
item = parse_queue.get
if parse_queue.get is None:
break
if item.get(keyword) is not None:
# flatten, convert to dataframe, send to write
# I do this for 4 different cases of items.
I have also tried sending pandas Series or Dataframes instead of python dicts. That doesn't seem to help. I've tried flattening the dict before hand, which doesn't change anything.
accessing a python dict is O(1). The read process is obviously fast enough to create and send python dicts to the parse queue at a rate of 2-3k dicts a second (after filtering out unwanted garbage json). So why is this parse taking so long? It takes about 10-100x as long to parse than it does to read the files.
What I think will help:
I believe that my problem is something that can only be solved by parsing directly from the filestream, but how can I filestream these json objects when they're in a file format like this?
My main question:
How can I make this parse process faster? Do I need to read directly from the filestream? If so, how can I do that?
SOLUTION::
When you send lots of items through a multiprocessing.Queue
it has to deserialize when sent and then the consumer must reparse the item. This greatly slows down your processing time. Either combine lots of small objects into one big object that you can send through the queue at once, or consider combining two process (for instance, reading and parsing json) and it will be much faster.
After fixing the problem I had earlier by combining the read
and parse
classes I created, I saw that I had a huge write bottleneck as well. I fixed this by concatenating an entire files worth of parsed json objects into one list, and sending that list to the write
process through a queue.
Key takeaway: Queues have a lot of overhead and take lots of processing time to .get
, so find a way to minimize the amount of items being sent through the queue by either combining processes that are unnecessarily abstracted or combining lots of objects into one big one.