You already have quite the right sense of what you can do to use your data.
The best solution depends on your actual needs,
so I will try to cover the possibilities with a working example.
What you want
If I understand your need completely, you want to
- continuously update a DataFrame (from a websocket)
- while doing some computations on the same DataFrame
- keeping the DataFrame up to date on the computation workers,
- one computation is CPU intensive
- another is not.
What you need
As you said, you will need a way to run different threads or processes in order to keep the computation running.
How about Threads
The easiest way to achieve what you want would be to use the threading library.
Since threads can share memory, and you only have one worker actually updating the DataFrame, it is quite easy to propose a way to manage the data up to date:
import time
from dataclasses import dataclass
import pandas
from threading import Thread
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
class StreamLoader:
"""This class is our worker communicating with the websocket"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(5):
self.update_df()
print("[1] Updated DF %s" % str(self.df_holder.dataframe))
time.sleep(3)
class LightComputation:
"""This class is a random computation worker"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def compute(self):
print("[2] Current DF %s" % str(self.df_holder.dataframe))
def run(self):
# limit loop for the showcase
for _ in range(5):
self.compute()
time.sleep(5)
def main():
# We create a DataFrameHolder to keep our DataFrame available.
df_holder = DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Thread(target=compute.run)
compute_process.start()
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
Note that we use a class to hold reference of the current DataFrame because some operations like append
are not necessarily inplace,
thus, if we directly sent the reference to the DataFrame object, the modification would be lost on the worker.
Here the DataFrameHolder
object will keep the same location in memory, thus the worker can still access the updated DataFrame.
Processes may be more powerful
Now if you need more computation power, processes may be more useful since they enable you to isolate your worker on a different core.
To start a Process instead of a Thread in python, you can use the multiprocessing library.
The API of both objects are the same and you will only have to change the constructors as follow
from threading import Thread
# I create a thread
compute_process = Thread(target=compute.run)
from multiprocessing import Process
# I create a process that I can use the same way
compute_process = Process(target=compute.run)
Now if you tried to change the values in the above script, you will see that the DataFrame is not updating correctly.
For this you will need more work since the two processes don't share memory, and you have multiple ways of communicating between them (https://en.wikipedia.org/wiki/Inter-process_communication)
The reference of @SimonCrane is quite interesting on the matters and showcases the use of a shared-memory between two processes using multiprocessing.manager.
Here is a version with the worker using a separate process with a shared memory:
import logging
import multiprocessing
import time
from dataclasses import dataclass
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from threading import Thread
import pandas
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
def retrieve(self):
return self.dataframe
class DataFrameManager(BaseManager):
"""This dataclass handles shared DataFrameHolder.
See https://docs.python.org/3/library/multiprocessing.html#examples
"""
# You can also use a socket file '/tmp/shared_df'
MANAGER_ADDRESS = ('localhost', 6000)
MANAGER_AUTH = b"auth"
def __init__(self) -> None:
super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)
self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
@classmethod
def register_dataframe(cls):
BaseManager.register("DataFrameHolder", DataFrameHolder)
class DFWorker:
"""Abstract class initializing a worker depending on a DataFrameHolder."""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
class StreamLoader(DFWorker):
"""This class is our worker communicating with the websocket"""
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(4):
self.update_df()
print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))
time.sleep(3)
class LightComputation(DFWorker):
"""This class is a random computation worker"""
def compute(self):
print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))
def run(self):
# limit loop for the showcase
for _ in range(4):
self.compute()
time.sleep(5)
def main():
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
# Register our DataFrameHolder type in the DataFrameManager.
DataFrameManager.register_dataframe()
manager = DataFrameManager()
manager.start()
# We create a managed DataFrameHolder to keep our DataFrame available.
df_holder = manager.DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Process(target=compute.run)
compute_process.start()
# The managed dataframe is updated in every Thread/Process
time.sleep(5)
print("[0] Main process DF\n%s" % df_holder.retrieve())
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
As you can see, the differences between threading and processing are quite tiny.
With a few tweaks, you can start from there to connect to the same manager if you want to use a different file to handle your CPU intensive processing.