21

I am doing an audio player that received samples from an udp socket, and everything was working fine. But when I implemented an Lost Concealment algorithm, the player failed to keep producing silence at the excepted rate (each 10ms send a list of multiple 160 bytes).

When playing audio with pyaudio, using the blocking call write to play some samples, I noticed it blocked on average for duration of the sample. So I created a new dedicated process to play the samples.

The main process processes the output stream of audio and sends the result to that process using a multiprocessing.Pipe . I decided to use the multiprocessing.Pipe because it was supposed to be faster than the other ways.

Unfortunately, when I runned the program on a virtual machine, the bitrate was half of what I was getting on my fast PC, which didnt fail to meet the target bitrate.

After some tests, I concluded that what was causing the delay was the Pipe's function send.

I did a simple benchmark script (see below) to see the differences between the various methods of transmiting to a process. The script, keeps sending a [b'\x00'*160] constantly for 5 seconds, and counts how many bytes of the bytes object were sent in total. I tested the following methods of sending: "not sending", multiprocessing.Pipe, multiprocessing.Queue, multiprocessing.Manager, multiprocessing.Listener/Client and finally, socket.socket:

Results for my "fast" PC running window 7 x64:

test_empty     :     1516076640
test_pipe      :       58155840
test_queue     :      233946880
test_manager   :        2853440
test_socket    :       55696160
test_named_pipe:       58363040

Results for the VirtualBox's VM guest running Windows 7 x64, host running Windows 7 x64:

test_empty     :     1462706080
test_pipe      :       32444160
test_queue     :      204845600
test_manager   :         882560
test_socket    :       20549280
test_named_pipe:       35387840  

Script used:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time

FS = "{:<15}:{:>15}"


def test_empty():
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]

        sent += len(data)
        if time.time()-s >= 5:
            break
    print(FS.format("test_empty", sent))


def pipe_void(pipe_in):
    while True:
        msg = pipe_in.recv()
        if msg == []:
            break


def test_pipe():
    pipe_out, pipe_in = Pipe()
    p = Process(target=pipe_void, args=(pipe_in,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        pipe_out.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    pipe_out.send([])
    p.join()
    print(FS.format("test_pipe", sent))


def queue_void(q):
    while True:
        msg = q.get()
        if msg == []:
            break


def test_queue():
    q = Queue()
    p = Process(target=queue_void, args=(q,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        q.put(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    q.put([])
    p.join()

    print(FS.format("test_queue", sent))


def manager_void(l, lock):
    msg = None
    while True:
        with lock:
            if len(l) > 0:
                msg = l.pop(0)
        if msg == []:
            break


def test_manager():
    with Manager() as manager:
        l = manager.list()
        lock = manager.Lock()
        p = Process(target=manager_void, args=(l, lock))
        p.start()
        s = time.time()
        sent = 0
        while True:
            data = b'\x00'*160
            lst = [data]
            with lock:
                l.append(lst)
            sent += len(data)
            if time.time()-s >= 5:
                break
        with lock:
            l.append([])
        p.join()

        print(FS.format("test_manager", sent))


def socket_void():
    addr = ('127.0.0.1', 20000)
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_socket():
    addr = ('127.0.0.1', 20000)
    listener = Listener(addr, "AF_INET")
    p = Process(target=socket_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_socket", sent))


def named_pipe_void():
    addr = '\\\\.\\pipe\\Test'
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_named_pipe():
    addr = '\\\\.\\pipe\\Test'
    listener = Listener(addr, "AF_PIPE")
    p = Process(target=named_pipe_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_named_pipe", sent))


if __name__ == "__main__":
    test_empty()
    test_pipe()
    test_queue()
    test_manager()
    test_socket()
    test_named_pipe()

Question

  • If Queue uses Pipe how is it faster than Pipe in this context? This contradicts the question Python multiprocessing - Pipe vs Queue
  • How could I garante a constant bitrate stream from on process to another, while having a low send delay?

Update 1

Inside my program, after trying out with Queues instead of Pipes. I got an enormous boost.

On my computer, using Pipes I got +- 16000 B/s , using Queues I got +-7.5 Million B/s. On the virtual machine I got from +-13000 B/s to 6.5 Million B/s. Thats about 500 times more bytes using Queue instread of Pipe.

Of course I wont be playing millions of bytes per seconds, I will only be playing the normal rate for sound. (in my case 16000 B/s, coincidence with the value above).
But the point is, I can limit the rate to what I want, while still having time to finish other computations (like receiving from sockets, applying sound algorithms, etc)

Community
  • 1
  • 1
Rui Botelho
  • 752
  • 1
  • 5
  • 18
  • 4
    Your `Queue` test is invalid because the 5 seconds are spent filling a local queue (`q._buffer`) while a worker thread pickles and sends the objects. Then you `join` while waiting for the queue to empty, which takes a lot longer than 5 seconds. You should instead time how long it takes to send a given number of bytes. – Eryk Sun Nov 13 '14 at 18:30
  • Also, test_pipe and test_named_pipe are essentially the same test. You're just using an explicit name in the latter, as opposed to the `connection.arbitrary_address('AF_PIPE')` that `Pipe` uses. – Eryk Sun Nov 13 '14 at 18:43
  • @eryksun thank you for noticing the details. I will read about the Queue implementation and make a different test – Rui Botelho Nov 13 '14 at 19:06
  • 1
    Perhaps Python3 simply improved `multiprocessing` module, and comparison referred to is no longer relevant? There can also be OS-specific reasons, is `Pipe` actually uses a pipe, then Windows implementation's performance will be radically different that Linux or OSX. Meanwhile `Queue` must use shared memory (I think), thus performance will be similar across operating systems. – Dima Tisnek Nov 17 '14 at 10:14
  • I was benchmarking [circuits](https://github.com/circuits/circuits) as this question peaked my interest in what Async I/O *could* achieve; See: https://gist.github.com/prologic/fea809047ba61847ebb5 -- I get ~577KB/s using ``circuits.node`` at a 10ms interval. – James Mills May 27 '15 at 15:41
  • Btw; I suspect a UNIX Socket / Pipe is going to be considerably much faster in terms of IPC; I'm going to try and experiment with this a bit more. The example above uses JSON encoded event streams over TCP between the processes. I think we can improve on ~500KB/s on my system. – James Mills May 27 '15 at 22:28
  • What kind of performance numbers do you **need* here? Perhaps if you stick a small bounty on this question I'll implement a solution that meets your requirements and try to explain why you're seeing *slow* IPC via other means. – James Mills May 27 '15 at 22:29
  • "So I created a new dedicated process to play the samples." As it is said here: http://stackoverflow.com/questions/2275909/whats-the-advantage-of-queues-over-pipes-when-communicating-between-processes, pipe is not thread nor process safe. So indeed, queue is suppose to be slower because it adds all the thread safe mechanisms to pipe. But in a multiprocess context, it might just me faster because of a better data management. – muchwow Jun 17 '15 at 14:33

1 Answers1

3

I can't say for sure, but I think the issue you're dealing with is synchronous versus asynchronous I/O. My guess is that the Pipe is somehow ending up synchronous and the Queue is ending up asynchronous. Why exactly one is defaulting one way and the other is the other might be better answered by this question and answer:

Synchronous/Asynchronous behaviour of python Pipes

Community
  • 1
  • 1
Mike Sandford
  • 1,315
  • 10
  • 22