5

today I decided to transform my little script, based on gst-launch, into a real Python/GStreamer application, in order to add some features.

I developed a little program which send the audio from my microphone to both Icecast (shout2send) and local storage (filesink) thanks to tee.

Sometimes shout2send can halt, due to network problems. I would like to restart this element every N seconds until the connection is back, without stopping the pipeline, because the local audio file should not be impacted from network conditions.

Here's what I tried:

  1. stopping/starting the pipeline one second after the network error (result: streaming works, local file is truncated)
  2. unlink from tee, set shout2send state to NULL and removing it from pipeline (result: GStreamer critical errors like Trying to dispose element ... but it is in PLAYING instead of the NULL state)
  3. Trying to understand how to use pads in this case (result: same as above, but with more code involved)

What should I do?

Here's how my code looks like :

import gi
gi.require_version("Gst", "1.0")
from gi.repository import GLib
from gi.repository import Gst
# [...]

def message_handler(bus, message):
    if message.type == Gst.MessageType.ERROR:
        if message.src == shout2send:
            pass # TODO: restart the element
        else:
            print(message.parse_error())
            pipeline.set_state(Gst.State.NULL)
            exit(1)
    else:
        print(message.type)

pipeline = Gst.Pipeline()
message_bus = pipeline.get_bus()
message_bus.add_signal_watch()
message_bus.connect('message', message_handler)

# [...]
tee.link(queue0)
queue0.link(filesink)
tee.link(queue1)
queue1.link(shout2send)

Update (9/12/15): non-working code added + log

I tried to follow "Dynamically changing the pipeline" fro GStreamer doc, but my code doesn't work.

def event_probe(pad, info, *args):
    Gst.Pad.remove_probe(pad, info)
    queue1.unlink(shout2send)
    tee.unlink(queue1)
    pipeline.remove(shout2send)
    pipeline.remove(queue1)
    return Gst.PadProbeReturn.OK

def message_handler(bus, message):
    if message.type == Gst.MessageType.ERROR:
        if message.src == shout2send:
            pad = queue1.get_static_pad('src')
            pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
        else:
            print(message.parse_error())
            pipeline.set_state(Gst.State.NULL)
            exit(1)
    else:
        print(message.type)

Here's what I see if I run my script with GST_DEBUG=3 and I restart Icecast while streaming:

[...]
0:00:02.142033258  5462 0x55e414d900a0 WARN                  shout2 gstshout2.c:674:gst_shout2send_render:<shout2send> error: shout_send() failed: Socket error
0:00:02.658137998  5462 0x55e414d90140 WARN                 basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: Internal data flow error.
0:00:02.658169752  5462 0x55e414d90140 WARN                 basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: streaming task paused, reason error (-5)
(GLib.Error('Internal data flow error.', 'gst-stream-error-quark', 1), 'gstbasesrc.c(2943): gst_base_src_loop (): /GstPipeline:pipeline0/GstPulseSrc:pulsesrc:\nstreaming task paused, reason error (-5)')
0:00:02.658628129  5462 0x7f6ba8002a30 WARN                audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment
phuclv
  • 37,963
  • 15
  • 156
  • 475
Francesco Frassinelli
  • 3,145
  • 2
  • 31
  • 43
  • what do you mean doesnt work(I havent analyzed it though)? some logs? – nayana Dec 09 '15 at 08:36
  • @otopolsky log added; it looks like the pipeline stops to work for an `Internal data flow error` – Francesco Frassinelli Dec 09 '15 at 09:13
  • this looks like something wasnt connected .. check with `GST_DEBUG=3,default:4` – nayana Dec 09 '15 at 09:28
  • also are you working with EOS as stated in the dynamic pipe change example? – nayana Dec 09 '15 at 09:31
  • 1
    also you should set NULL to element before removing.. – nayana Dec 09 '15 at 09:36
  • @otopolsky It looks like that EOS is needed if you don't want to lose data and there are other elements in the pipeline: this is not my case, because shout2send is dead and I don't mind if there are data in queue1. If I set theirs states to NULL before removing and I run the application `GST_DEBUG=3,default:4` nothing changes (same error). – Francesco Frassinelli Dec 09 '15 at 09:54
  • 1
    ok so no additional info when you add default:4? .. I noticed ` tee.unlink(shout2send)` .. shouldnt it be tee.unlink(queue1) ? – nayana Dec 09 '15 at 10:17
  • @otopolsky no additional info with default:4. Yes, you're right I fixed it and the last two log lines are disappeared (log and code updated), but it still doesn't work – Francesco Frassinelli Dec 09 '15 at 11:20
  • 1
    now when I think of it, shouldnt you block the pads before tee? because having one branch of tee running and not the other seems weird.. but I dont know what will happen to the file if you block probles.. – nayana Dec 09 '15 at 12:24

1 Answers1

5

Thanks to otopolsky's comments I did it :)

What I did wrong:

  1. elements must be set to NULL: this is very important
  2. oggmux must stay after tee, on both sub-pipelines: otherwise Icecast will list the stream without be able to serve it. Do the same for opusenc

Advice:

  1. Is not necessary to unlink every element you don't need: just break where needed
  2. Is not necessary to remove from the pipeline every element you don't need: keep them if you think to reuse them

Final code (reconnection works correctly and independently from local encoding/recording):

def event_probe2(pad, info, *args):
    Gst.Pad.remove_probe(pad, info.id)
    tee.link(opusenc1)
    opusenc1.set_state(Gst.State.PLAYING)
    oggmux1.set_state(Gst.State.PLAYING)
    queue1.set_state(Gst.State.PLAYING)
    shout2send.set_state(Gst.State.PLAYING)
    return Gst.PadProbeReturn.OK

def reconnect():
    pad = tee.get_static_pad('src_1')
    pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe2, None)

def event_probe(pad, info, *args):
    Gst.Pad.remove_probe(pad, info.id)
    tee.unlink(opusenc1)
    opusenc1.set_state(Gst.State.NULL)
    oggmux1.set_state(Gst.State.NULL)
    queue1.set_state(Gst.State.NULL)
    shout2send.set_state(Gst.State.NULL)
    GLib.timeout_add_seconds(interval, reconnect)
    return Gst.PadProbeReturn.OK

def message_handler(bus, message):
    if message.type == Gst.MessageType.ERROR:
        if message.src == shout2send:
            pad = tee.get_static_pad('src_1')
            pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
        else:
            print(message.parse_error())
            pipeline.set_state(Gst.State.NULL)
            exit(1)
    else:
        print(message.type)

Minor problems:

  1. I use tee.get_static_pad('src_1'), but I think I could get the src id somewhere, instead of using a fixed value
  2. Probably the whole thing could be written in a better form (but this is my first program with Python+Gstreamer and it works, so I'm fine with it)
  3. In order to avoid data loss I call pipeline.set_state(Gst.State.NULL) one second after pipeline.send_event(Gst.Event.new_eos()), but I still get messages like WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment

Code: https://github.com/ViGLug/libre-streaming

Francesco Frassinelli
  • 3,145
  • 2
  • 31
  • 43