6

TL;DR: How to share a large (200MB) read only dict between multiple processes in a performant way, that is accessed VERY heavily without each process having a full copy in memory.

EDIT: It looks like if I just pass the dictionary as the argument for the multiprocessing.Pool/Process, it won't actually create a copy unless a worker modifies the dictionary. I just assumed it would copy. This behavior seems to be Unix only where fork is available and even then not always. But if so, it should solve my problem until this is converted to an ETL job.

What I'm trying to do:

I have a task to improve a script that replicates data from one store to another. Normalizing and transforming the data on the way. This task works on the scale of around 100 million documents coming from the source document store that get rolled up and pushed to another destination document store.

Each document has an ID and there is another document store is that essentially a key value store of those ID's mapped to some additional information needed for this task. This store is a lot smaller and doing queries against it while document from the main store come through, is not really an option without heavy caching and that heavy cache ends up being a copy of the whole thing very quickly. I just create the whole dictionary dictionary from that entire store at beginning before starting anything and use that. That dictionary is around ~200MB in size. Note that this dictionary is only ever read from.

For this I have setup multiprocessing and have around 30 concurrent processes. I've divided the work for each process such that each hit a different indices and can do the whole thing in around 4 hours.

I have noticed that I am extremely CPU bound when doing the following 2 things:

  • Using a thread pool/threads (what i'm currently doing) so each thread can access the dict without issue. The GIL is killing me and I have one process maxing out at 100% all the time with other CPU's sitting idle. Switching to PyPy helped a lot, but i'm still not happy with this approach.
  • Creating a Multiprocessing.Manager().dict() for the large dict and having the child processes access through that. The server process that this approach creates is constantly at 100% cpu. I don't know why, as I only ever read from this dictionary so I doubt it's a locking thing. I don't know how the Manager works internally but i'm guessing that the child processes are connecting via Pipes/Sockets for each fetch and the overhead of this is massive. It also suggests that using Reddis/Memcache will have the same problem if true. Maybe it can be configured better?

I am Memory bound when doing these things:

  • Using a SharedMemory view. You can't seem to do this for dicts like I need to. I can serialize the dict to get into the shared view, but for it to be usable on the Child process you need serialize the data to an actual usable dict which creates the copy in the process.

I strongly suspect that unless I've missed something I'm just going to have to "download more ram" or rewrite from Python into something without a GIL (or use ETL like it should be done in...).

In the case of ram, what is the most efficient way to store a dict like this to make it sting less? It's currently a standard dict mapped to a tuple of the extra information consisting of 3 long/float.

doc_to_docinfo = { "ID1": (5.2, 3.0, 455), }

Are there any more efficient hashmap implementations for this use case than what i'm doing?

Callum
  • 610
  • 6
  • 12
  • I find this post, if it is usefull for you https://stackoverflow.com/a/39020355/12291742 – Ekrem Dinçel Feb 09 '20 at 17:31
  • It is possible to fork with a copy-on-write. **But** this possibility is limited to certain operating systems and (since several CPU bugs were mitigated) also CPUs. What are you using? – Klaus D. Feb 09 '20 at 17:31
  • you should store your dict in something like redis key/value store I think ... still requires serialized entries but this allows quick access to whatever keys you need – Joran Beasley Feb 09 '20 at 17:31
  • @EkremDİNÇEL That is certainly related but I have used the approach suggested (usage of Manager.dict) and it falls apart for my use case. – Callum Feb 09 '20 at 17:39
  • 1
    Starting w/ Python 3.8, there is https://docs.python.org/3/library/multiprocessing.shared_memory.html, but atm only `ShareableList` is implemented, so you'd have to serialize data to use `shm` through this interface. – Ondrej K. Feb 09 '20 at 17:40
  • @JoranBeasley Manager.dict() approach suggests that this will also be CPU bound, though that is probably the most correct solution to this problem, maybe it will be more optimized but i'm not too hopeful. What i'm really hoping for is a way to Share memory (I have no locking requirements) in a usable way that I don't yet know of. – Callum Feb 09 '20 at 17:41
  • @KlausD. It runs on a debian based python docker image. I can use Unix based API's if available. This process is dying to be converted into an ETL based solution and I can tolerate platform compatibility being compromised. – Callum Feb 09 '20 at 17:43
  • @KlausD. If i'm understanding this correctly, you're suggesting multiprocessing in the "fork" instead of spawn mode? This looks very promissing to me! – Callum Feb 09 '20 at 17:57
  • It will be most promising on Linux with an AMD CPU or (**dangerous**, you should know what it does before applying) https://make-linux-fast-again.com/ – Klaus D. Feb 09 '20 at 18:04
  • Why does the cache end up being a copy of the whole thing? You mean each process handles only 1/30 of all documents but still needs the extra information for *all* documents? – Kelly Bundy Feb 09 '20 at 18:14
  • 1
    @HeapOverflow That's exactly it. The data store is a little bit more complicated that i explained. But basically there's a concept of parent and child documents where the parent is indexed by date, but the child documents are spread all over (bad design) and when those child docs get rolled up they'll need the whole dictionary. – Callum Feb 09 '20 at 18:19

1 Answers1

1

You seem to have a similar problem that I have. It is possible to use my source here to create a partitioning of those dictionary-keys per thread. My suggestion: Split the document IDs into partitions of length 3 or 4, keep the partition table in sync for all processes/threads and then just move the parts of your documents to each process/thread and as an entrypoint the process does a dictionary lookup and finds out which process can handle the part of that dictionary. If you are clever with balancing the partitions, you could also have an equal amount of documents per thread managed.

Faster
  • 11
  • 2