2

The situation:

I have a large amount of data from a social media website encompassing 5 months, separated by minutes of line formatted json files. See here for explanation

All data is on a server with almost 400 gb of ram and a 32 core, 64 thread cpu. I have been taking advantage of this with python's multiprocessing module, however, I basically have to dedicate 29 processes to parsing and the rest to extracting, writing, and reading/deleting. The performance doesn't seem to get much better than my current configuration.

I have created a program with 5 main processes that extend the python multiprocessing module. There is a process for each step:

extracting: extract tar files encompassing 1 day of 1 minute json.bz2 files.

reading: read directly from bz2 files, send json dicts to parsing queue send message to delete module when finished with tar file.

deleting: deletes old tar files so as to not use as much space on the server

parsing: parses json dicts for keywords to categorize, flattens them, puts them into dataframe, and sends to write queue.

write: merges dataframes from write queue according to category. when master dataframe is at a certain size, write to output directory and discard.

The problem:

It takes way too long to get the data. If I ran the program as it is now without stopping, it will take over 20 weeks to parse the data that I actually want and separate it into 4 broad categories that I put into dataframes and write to hdf5 files.

What I have tried:

I have tried getting the json, putting it into a dictionary, line by line, like so:

with open('file') as f:
for line in f:
    json.loads(line)
    # send data to queue for parse processes

I have also tried to convert to another datatype to take some overhead off the parse process:

with open('file') as f:
for line in f:
    json.loads(line)
    # categorize and convert to flattened dataframes and send
    # to parse process

I have also tried other things, like using pandas Series or Lists instead of python dicts, but that doesn't seem to help either.

current pseudocode for main parse process:

while True:
    while parse queue empty: wait 1 second
    item = parse_queue.get
    if parse_queue.get is None:
        break
    if item.get(keyword) is not None:
       # flatten, convert to dataframe, send to write
      # I do this for 4 different cases of items.

I have also tried sending pandas Series or Dataframes instead of python dicts. That doesn't seem to help. I've tried flattening the dict before hand, which doesn't change anything.

accessing a python dict is O(1). The read process is obviously fast enough to create and send python dicts to the parse queue at a rate of 2-3k dicts a second (after filtering out unwanted garbage json). So why is this parse taking so long? It takes about 10-100x as long to parse than it does to read the files.

What I think will help:

I believe that my problem is something that can only be solved by parsing directly from the filestream, but how can I filestream these json objects when they're in a file format like this?

My main question:

How can I make this parse process faster? Do I need to read directly from the filestream? If so, how can I do that?

SOLUTION::

When you send lots of items through a multiprocessing.Queue it has to deserialize when sent and then the consumer must reparse the item. This greatly slows down your processing time. Either combine lots of small objects into one big object that you can send through the queue at once, or consider combining two process (for instance, reading and parsing json) and it will be much faster.

After fixing the problem I had earlier by combining the read and parse classes I created, I saw that I had a huge write bottleneck as well. I fixed this by concatenating an entire files worth of parsed json objects into one list, and sending that list to the write process through a queue.

Key takeaway: Queues have a lot of overhead and take lots of processing time to .get, so find a way to minimize the amount of items being sent through the queue by either combining processes that are unnecessarily abstracted or combining lots of objects into one big one.

Aiden
  • 31
  • 1
  • 8
  • Check my [answer](https://stackoverflow.com/a/61177496/10824407), probably writing such as custom "parser" will be a solution for you. – Olvin Roght Jun 19 '20 at 18:01
  • These queues, are they intraprocess queues with threads or interprocess queues? Extracting - don't let it get too far ahead. Ideally extracted files are still in file cache when read. Appending to a dataframe row by row can get expensive because of copying existing data. You may be better off with a list you make into a dataframe at the end. – tdelaney Jun 19 '20 at 18:14
  • It seems like you `json.loads(line)` and then send to another process, which means it is serialized and parsed again in the next process. The json.loads filtering and processing likely should be in a single process. You have a queue where you have to poll and sleep? Use a queue that has a wait. Do you consume all 400 GB of memory? Throttle back on processes to avoid thrashing. Do you have multiple hard drives or an SSD? Try to spread the I/O among them - maybe extract tar to a different drive for instance. – tdelaney Jun 19 '20 at 18:26
  • Yes, I only extract a day's worth of tar files at once to NVME drives on the server. Each tar file is one hour I believe. So I extract 24 tar files, with 60 folders inside, with 60 more files of .json.bz2. The read process can run through it very quickly. I will try parsing directly after reading. I don't consume all 400 gb of memory, I consume maybe 20-30 at most. It seems more CPU intensive and I get very diminishing returns when I let it use that much memory. The actual merging of the dataframes takes very little time (that was my first thought when the program was taking long) – Aiden Jun 19 '20 at 18:39
  • Hey @tdelany thank you so much. I've been banging my head against the wall for literally the entire week. I changed it so that parsing was done on the spot and it fixed it just fine. It's very fast now. – Aiden Jun 19 '20 at 18:50
  • now i have a huge write bottleneck, I will try putting each dict in a list and then concatenating at the end. I would appreciate any other comments you have with the write process though @tdelaney. – Aiden Jun 19 '20 at 19:39
  • If you are saving dataframes, some formats are faster than others. Feather and parquet will typically beat csv. If you are writing lots of dataframes, then the whole data store starts to be interesting. HDFS for instance. I'm guessing that a list of lists would transform into a dataframe faster than a list of dicts... just a guess. So you could normalize the dict a list before adding to the accumulating list. But that takes time too, so it may be a wash. – tdelaney Jun 19 '20 at 20:04
  • right now, i'm taking normalized json from the read/parse process, sending it through a queue to the write process, and then concatenating those objects to a list. once the list is of a certain size, it will go through the entire list and merge it into one dataframe and write to a csv. the queue is just filling up really quickly and the write process can't empty it fast enough. i assume this is because of the serialization/parse debacle i just solved in the parse problem. – Aiden Jun 19 '20 at 20:09
  • Do you use the dataframe to process data or is it just an intermediate container? If there is no processing, then you could remove much of this pipeline. The place where you are adding data to the list could become a `csv.writer` that is writing the rows to disk right then and there. When it hits a size limit or whatever your criteria is, close its file, make a new csv.writer and continue on. – tdelaney Jun 19 '20 at 20:18
  • No, I'm not processing any data with the dataframes (later i will after i actually get the data i need), but the json dicts do not match except for a few columns so that's why I feel the need to merge them, if that makes sense. – Aiden Jun 19 '20 at 20:19
  • Now that you mention that, I think I will just write line-by-line json, again. I will be unfolding the data and categorizing it so it will still be useful. I really need to get this data so I think I need to do that for now. However, I'm open to any other optimizations you might think of – Aiden Jun 19 '20 at 20:23

1 Answers1

0

If you just want faster json parsing you might look at ujson (https://pypi.org/project/ujson/)

instead of

import json
parsed = json.loads(line)

you'd write

import ujson
parsed = ujson.loads(line)

However if you know the json file very well you might try to extact the information you want to extract with a custom regular expression or a custom parser, but such solutions might break under certain circumstances except they are very well written as they might depend on the json line to have more specifics, than required by the json standard.

Apart from that I suggest you profile your code to find out what exactly slows you down.

It might be worth tying something like that:

have one process read one file, read n lines (e.g. 50) delegate parsing (ujson.loads()) of these 50 lines to a worker pool.

Multiprocessing has quite some overhead. if you delegate very tiny tasks / elements (like just one line), then the multiprocessing overhead might be bigger than what you gain by distributing your work load)

gelonida
  • 5,327
  • 2
  • 23
  • 41