I am trying to create an online (real-time) video processing application in python. The input video sequence comes in real time from an input camera stream and the frames need to be processed, one batch/window at one go, in real time. The processing for a batch of frames is significantly slower than the time interval between consecutive frames, hence to achieve real-time processing with a constant delay between input and output parallelism will be needed. To achieve this goal, I thought of using multiprocessing to create two processes - a process that completes its work on a contiguous window of frames (i.e. a consumer) in the time that another process (a producer) reads in the next window of frames coming in from the camera (at a throttled frame rate if necessary).
To test out this idea I wrote the below example in python which attempts to communicate one frame of information (a numpy array), with resolution characterized by Size, from the producer to the consumer through an IPC Pipe()
.
from multiprocessing import Process, Pipe
import numpy as np
Size = 256
def producer(conn):
print("Child entered")
window1 = np.arange(Size * Size).reshape((Size, Size))
print("Array created")
conn.send(window1)
print("Data added to Pipe")
return
if __name__ == '__main__':
conn1, conn2 = Pipe()
window_reader = Process(target=producer, args=(conn2,))
window_reader.start()
window_reader.join()
print("Child process exited, entered parent")
# Acting like a consumer
a = conn1.recv()
print(a)
With Size = 256
, the output observed is-
Child entered
Array created
After which the processing freezes up and the program has to be forcibly terminated suggesting that the producer process got blocked while attempting to write to the Pipe. The python multiprocessing library provides a Queue()
method in addition to Pipe()
. The documentation for the same states that Queue()
works as per the following-
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.
However, if I replace Pipe()
by Queue()
in the above example to get the following python code-
from multiprocessing import Process, Queue
import numpy as np
Size = 256
def producer(mp_queue):
print("Child entered")
window1 = np.arange(Size * Size).reshape((Size, Size))
print("Array created")
mp_queue.put(window1)
print("Data added to Pipe")
return
if __name__ == '__main__':
q = Queue()
window_reader = Process(target=producer, args=(q,))
window_reader.start()
window_reader.join()
print("Child process exited, entered parent")
# Acting like a consumer
a = q.get()
print(a)
Then the output is (surprisingly) different from the previous case of pipes, since Queue()
, unlike Pipe()
, is able to get past the put() / send()
stage giving output-
Child entered
Array created
Data added to Pipe
When Size
is small enough, Size=64
for example, then the program completes execution as expected with a successful transfer of the frame between the producer and consumer processes. The blocking of execution for Size=256
and larger seems to be the result of a system wide limit on the size of IPC Pipes on Linux. This answer suggested that there is no upper limit on size of Pipes and that the default upper limit is available by running cat /proc/sys/fs/pipe-max-size
. On running this command I got a value of 1MB
(printed value was 1048576 Bytes). This seems to make sense, since with np.int64
entries in the frame along with some overhead arising from the internal bookkeeping of the multiprocessing library, any value of Size
greater than 64 would be pushing it.
Hence, my questions are-
- Can I safely (if at all) increase the limit on IPC Pipes possible on my system by editing
/proc/sys/fs/pipe-max-size
. - Is increasing the size of Pipes on the system even a good away to achieve my goal? Since the
1MB
default limit is probably there for good reason. - Would some other IPC mechanism be more suited to my case? I want to send out batches of frames from the producer to the consumer with the estimated size of each window or batch between 4-8 MB plus overhead
- I went for multiprocessing (mp) over multithreading (mt) since the former guarantees kernel level separation between the processes whereas with the python multithreading library, we can end up with user level threads that cannot take advantage of multiple cores. When the processing on the video is done sequentially, it is not real time, hence I was looking for a method to exploit multiple cores.
- In this application, there isn't really a need for a shared memory since one process writes and the other merely reads the information. This was another reason for preferring mp over mt, since as per my understanding, we prefer the latter when there is a need for shared memory (the heap is shared between kernel threads of the same process). Given the limits of Pipes, suggestions on this front will also be appreciated.