I am doing an audio player that received samples from an udp socket, and everything was working fine. But when I implemented an Lost Concealment algorithm, the player failed to keep producing silence at the excepted rate (each 10ms send a list of multiple 160 bytes).
When playing audio with pyaudio, using the blocking call write to play some samples, I noticed it blocked on average for duration of the sample. So I created a new dedicated process to play the samples.
The main process processes the output stream of audio and sends the result to that process using a multiprocessing.Pipe . I decided to use the multiprocessing.Pipe because it was supposed to be faster than the other ways.
Unfortunately, when I runned the program on a virtual machine, the bitrate was half of what I was getting on my fast PC, which didnt fail to meet the target bitrate.
After some tests, I concluded that what was causing the delay was the Pipe's function send
.
I did a simple benchmark script (see below) to see the differences between the various methods of transmiting to a process. The script, keeps sending a [b'\x00'*160]
constantly for 5 seconds, and counts how many bytes of the bytes object were sent in total. I tested the following methods of sending: "not sending", multiprocessing.Pipe, multiprocessing.Queue, multiprocessing.Manager, multiprocessing.Listener/Client and finally, socket.socket:
Results for my "fast" PC running window 7 x64:
test_empty : 1516076640
test_pipe : 58155840
test_queue : 233946880
test_manager : 2853440
test_socket : 55696160
test_named_pipe: 58363040
Results for the VirtualBox's VM guest running Windows 7 x64, host running Windows 7 x64:
test_empty : 1462706080
test_pipe : 32444160
test_queue : 204845600
test_manager : 882560
test_socket : 20549280
test_named_pipe: 35387840
Script used:
from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time
FS = "{:<15}:{:>15}"
def test_empty():
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
sent += len(data)
if time.time()-s >= 5:
break
print(FS.format("test_empty", sent))
def pipe_void(pipe_in):
while True:
msg = pipe_in.recv()
if msg == []:
break
def test_pipe():
pipe_out, pipe_in = Pipe()
p = Process(target=pipe_void, args=(pipe_in,))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
pipe_out.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
pipe_out.send([])
p.join()
print(FS.format("test_pipe", sent))
def queue_void(q):
while True:
msg = q.get()
if msg == []:
break
def test_queue():
q = Queue()
p = Process(target=queue_void, args=(q,))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
q.put(lst)
sent += len(data)
if time.time()-s >= 5:
break
q.put([])
p.join()
print(FS.format("test_queue", sent))
def manager_void(l, lock):
msg = None
while True:
with lock:
if len(l) > 0:
msg = l.pop(0)
if msg == []:
break
def test_manager():
with Manager() as manager:
l = manager.list()
lock = manager.Lock()
p = Process(target=manager_void, args=(l, lock))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
with lock:
l.append(lst)
sent += len(data)
if time.time()-s >= 5:
break
with lock:
l.append([])
p.join()
print(FS.format("test_manager", sent))
def socket_void():
addr = ('127.0.0.1', 20000)
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break
def test_socket():
addr = ('127.0.0.1', 20000)
listener = Listener(addr, "AF_INET")
p = Process(target=socket_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()
print(FS.format("test_socket", sent))
def named_pipe_void():
addr = '\\\\.\\pipe\\Test'
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break
def test_named_pipe():
addr = '\\\\.\\pipe\\Test'
listener = Listener(addr, "AF_PIPE")
p = Process(target=named_pipe_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()
print(FS.format("test_named_pipe", sent))
if __name__ == "__main__":
test_empty()
test_pipe()
test_queue()
test_manager()
test_socket()
test_named_pipe()
Question
- If Queue uses Pipe how is it faster than Pipe in this context? This contradicts the question Python multiprocessing - Pipe vs Queue
- How could I garante a constant bitrate stream from on process to another, while having a low send delay?
Update 1
Inside my program, after trying out with Queues instead of Pipes. I got an enormous boost.
On my computer, using Pipes I got +- 16000 B/s , using Queues I got +-7.5 Million B/s. On the virtual machine I got from +-13000 B/s to 6.5 Million B/s. Thats about 500 times more bytes using Queue instread of Pipe.
Of course I wont be playing millions of bytes per seconds, I will only be playing the normal rate for sound. (in my case 16000 B/s, coincidence with the value above).
But the point is, I can limit the rate to what I want, while still having time to finish other computations (like receiving from sockets, applying sound algorithms, etc)