8

So lets say You have a Python process which is collecting data realtime with around 500 rows per second (this can be further parallelized to reduce to around 50 p.s.) from a queueing system and appending it to a DataFrame:

rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
    recv = rq.get(block=True)
    # some converting
    df.append(recv, ignore_index = True)

Now the question is: How to utilize the CPUs based on this data? So I am fully aware of the limitations of the GIL, and looked into multiprocessing Manager namespace, here too, but it looks like there are some drawbacks with regard to latency on the centerally hold dataframe. Before digging into it, I also tried pool.map which I than recognized to apply pickle between the processes, which is way to slow and has too much overhead.

So after all of this I finally wonder, how (if) an insert of 500 rows per second (or even 50 rows per second) can be transfered to different processes with some CPU time left for applying statistics and heuristics on the data in the child processes?

Maybe it would be better to implement a custom tcp socket or queueing system between the two processes? Or are there some implementations in pandas or other libaries to really allow a fast access to the one big dataframe in the parent process? I love pandas!

gies0r
  • 4,723
  • 4
  • 39
  • 50
  • Do you want to perform the statistics only on the chunks of 50 to 500 rows that are new every second and continuously append it to one big DF? The big DF should be stored or you need more real-time processing to be performed on it? – Ronald Luc Mar 31 '20 at 10:17
  • @RonaldLuc If it is a needed requirement, I would limit it to statistics on the newly 50 to 500 rows, yes. I could hold means and highs/lows in extra variables to keep track of the existing data in the big DataFrame. – gies0r Apr 02 '20 at 21:01

3 Answers3

4

Before we start I should say that you didn't tell us much about your code but have this point in your mind to only transfer those 50/500 new rows each second to the child process and try to create that big DataFrame in child process.

I'm working on a project exactly as same as you. Python got many IPC implementation such as Pipe and Queue as you know. Shared Memory solution may be problematic in many cases, AFAIK python official documentation warned about using shared memories.

In my experience the best way to transform data between only two processes is Pipe , so you can pickle DataFrame and send it to the other connection end point. I strongly suggest you to avoid TCP sockets ( AF_INET ) in your case.

Pandas DataFrame cannot be transformed to another process without getting pickled and unpickled. so I also recommend you to transfer raw data as built-in types like dict instead of DataFrame. This might make pickle and unpicking faster and also it has less memory footprints.

AmirHmZ
  • 516
  • 3
  • 22
  • I appreciate your answer @AmirHmZ! Especially the link to the benchmark is nice (and the tools around it, which I did not knew yet). A solution in the `Shared Memory` area - which can hopefully handle a lot of read processes from the child processes, while the main processes is appending to it - could do it from what I see, if I hardly restrict the write accesses to the parent process. – gies0r Apr 04 '20 at 15:17
  • .. But I do not know, if the `shared memory` is in some sort of `block state` while writing to it? That would mean, that child processes are not allowed to read the DataFrame, while the parent process appends to it (which will be almost always). – gies0r Apr 04 '20 at 15:20
  • @gies0r Sorry for my inactivity, If you are going to use `Shared Memory` solution , you should synchronize the child processes with the supplier process. It could be done by a `multiprocessing.Lock` : https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes – AmirHmZ Apr 07 '20 at 12:20
1

Parallelisation in pandas is probably better handled by another engine altogether.

Have a look at the Koalas project by Databricks or Dask's DataFrame.

gosuto
  • 5,422
  • 6
  • 36
  • 57
  • Well.. Thats a very huge amount of code to be reviewed and fixed... Dasks looks like it has good adaption, but still it is a ton of work. Are you aware of an example, where such a data load / update interval like the one mentioned in the question has been implemented/documented? – gies0r Apr 02 '20 at 21:04
  • I've used Dask to process 200+ GB datasets in parallel and small memory footprint, but it was not online. Dask is _basically_ many pandas dataframes stacked on top of each other. – Ronald Luc Apr 03 '20 at 06:36
  • @RonaldLuc what sort of operations did you do on your local machine ? – Umar.H Apr 03 '20 at 11:51
  • Load from parquet, row-wise numerical operations, calculation of geolocations, some operation on "local pandas DataFrame" (`df.map_partitions`) and then `groupby` index (important for performance in Dask), save as CSV. – Ronald Luc Apr 04 '20 at 10:15
  • Changed quiet some code to `dask` in the meantime. The calculation speeds are awesome and also it implements a nice way of queueing the logic into calculation graphs. Pretty good to use. But the original problem is still open (rapidly updating one big central dataframe). I posted another question with focus on dask: https://stackoverflow.com/questions/63156650/dask-update-published-dataset-periodically-and-pull-data-from-other-clients – gies0r Jul 29 '20 at 15:25
0

A simple solution would be to separate the process into two different stages. Use Asyncio for receiving the data in a non-blocking manner, and performing your transformations within that. The second stage would consume an Asyncio Queue to build the DataFrame. This is assuming you do not need the DataFrame available to a different process while you are receiving data from the Redis Queue.

Here is an example of building a producer/consumer model with Asyncio

arshit arora
  • 33
  • 1
  • 5