4

I have been following examples here to use Flask as a signalling server. I can run this successfully if I have the server and both clients running on the same computer (due to getUserMedia requiring either a secure connection or running on localhost).

The next step is to connect a gstreamer pipeline with webrtcbin that produces a video stream rather than using the camera feed. A second client (python) starts and sends this stream. I have used this example as a reference for managing webrtcbin through Python. I am currently using gstreamer-plugins-bad v1.14.5. I have been able to:

  1. Generate an offer, set its local description, and send to the client (js) via a signalling server.
  2. Accept client answer and set remote description.
  3. Generate and send ICE candidates, receive and set remote ICE candidates.

All of this is being done on a local network. From what I've read, I should not need a STUN server for this implementation.

When checking the connection status, the connection status is always stuck in connecting.

My question is, is gstreamer 1.14.5 too outdated to work with more recent browsers? Or is this potentially some other implementation issue? I've explored all other fixes I can think of. I don't want to update gstreamer unless absolutely necessary, as compiling a newer version would be nontrivial for the device I'm working with.

Python Offerer (Sends media stream)

import random
import ssl
import socketio
# Removing websockets implementation in favor of socketio
#import websockets
import asyncio
import os
import sys
import json
import argparse
import time
import threading
from flask import Flask, render_template, request, session
from flask_socketio import SocketIO, emit, join_room

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp

PIPELINE_DESC = '''
videotestsrc is-live=true pattern=ball ! videoconvert ! omxh264enc !
rtph264pay ! application/x-rtp,media=video,encoding-name=H264,payload=97 !
webrtcbin name=sendrecv
'''
# Use this if h264 not supported
# PIPELINE_DESC = '''
# videotestsrc is-live=true pattern=ball ! videoconvert ! vp8enc !
# rtpvp8pay ! application/x-rtp,media=video,encoding-name=VP8,payload=97 !
# webrtcbin name=sendrecv
# '''


class WebRTCSenderThread:
    def __init__(self, peer_id, port):
        # ID of this thread
        self.id = None
        # User ID to send to
        self.peer_id = peer_id
        # IP of signalling server
        # Hard-coding this bc the main server should always make these threads
        self.server = "localhost"
        self.port = str(port)
        self.conn = None
        self.pipe = None
        self.webrtc = None
        self.sio = None

        self.connect_socketio()

    def connect_socketio(self):
        """Connect to the server and send first message

        Initialize self.sio
        """
        print("Connecting...")
        self.sio = socketio.Client()
        url = "ws://" + str(self.server) + ":" + str(self.port)
        self.sio.connect(url)
        print("Connected.")
        self.bind_events()
        self.sio.emit('sender connect')

    def shutdown(self):
        sys.exit(0)

    def bind_events(self):
        """Bind events to class methods."""
        # Bind event callbacks
        self.sio.on("ack sender connect", self.handle_ack_server_connect)
        self.sio.on("data", self.handle_server_message)

    def register_sid(self, sid):
        """Update session ID
        Args:
            sid(str) - unique session ID string for this thread
        """
        print("Registering sender sid:", sid)
        self.id = sid

    def handle_ack_server_connect(self, msg):
        """Process server acknowledgement of socket connection.
        Args:
            msg(str) - message containing unique session ID for this process
        """
        print(msg)
        sid = msg["sid"]
        self.register_sid(sid)
        self.sio.emit("register sender to user", (sid, self.peer_id))

    def send_via_server(self, data):
        """Send data to other peer using the signalling server
        Args:
            data(str) - Message in JSON format
        """
        self.sio.emit("data", data)

    def on_negotiation_needed(self, element):
        # Function on_offer_created will be run when promise is ready
        promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
        element.emit('create-offer', None, promise)

    def on_offer_created(self, promise, _, __):
        """Use new offer to set local descriptions to point gstreamer pipeline

            Calls function to send session description to server

        Args:
            promise: object representing eventual completion or failure of async
                operation (in this case, the getting the offer reply)
            _: WebRTCBin object
            __: None

        Returns:

        """
        promise.wait()
        reply = promise.get_reply()
        print("reply obj:", reply.to_string())
        # offer = reply['offer']
        offer = reply.get_value('offer')
        promise = Gst.Promise.new()
        # Tell the pipeline to set local description based on the sdp offer
        self.webrtc.emit('set-local-description', offer, promise)
        # Wake up anythin waiting for this promise
        promise.interrupt()
        # Make server aware of sdp ready and give it the connection info
        self.send_sdp_offer(offer)

    def send_sdp_offer(self, offer):
        """Send sdp to server to join peer"""
        # Get session description protocol as string
        text = offer.sdp.as_text()
        print ('Sending offer:\n%s' % text)
        # Combine with session type to create valid message
        msg = {
            'sender_id': self.id,
            'target_id': self.peer_id,
            'type': 'offer',
            'sdp': text}
        # Create separate loop to manage the message sending
        self.send_via_server(msg)

    def send_ice_candidate_message(self, _, mlineindex, candidate):
        """
        Def ICE: Interactive Connectivity Establishment - config to establish
            RTCPeerConnection (usually this is a collection of IP's)
        Args:
            _: GstWebRTCBin object
            mlineindex:
            candidate:

        Returns:

        """
        msg = {
            "sender_id": self.id,
            "target_id": self.peer_id,
            "type": "new-ice-candidate",
            "candidate": candidate,
            "mlineindex": mlineindex}
        self.send_via_server(msg)

    def handle_new_ICE_message(self, msg):
        """Process ICE candidate to connect to remote client
        Args:
            msg(str) - JSON string containing ICE candidate
            ex. {'sender_id': '7b6859d839be4e84b9a43d7e420fc76f',
                 'target_id': '22702c883cee45af90bdf2eecc941dc5',
                 'type': 'new-ice-candidate',
                 'candidate':
                    {'candidate': 'candidate:827342168 1 udp 2113937151
                        6dc6d2ab-a49e-454d-8ba1-500570cbb2c6.local 39719 typ
                        host generation 0 ufrag Ub5h network-cost 999',
                     'sdpMid': 'video0',
                     'sdpMLineIndex': 0}}
        """
        ice = msg['candidate']
        candidate = ice['candidate']
        print("CANDIDATE:\n", candidate)
        sdpmlineindex = ice['sdpMLineIndex']
        self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)

    def handle_answer_message(self, msg):
        """Process SDP answer to set remote description
        Args:
            msg(str): SDP in JSON format
            ex. {'sender_id': 'c9a38256a2654d8fb989faa4666f4d30',
                 'target_id': '12468a8b89f24cc1a21a48d724adf669',
                 'type': 'answer',
                 'sdp':
                    {'type': 'answer', 'sdp': 'v=0\r\no=- 37057614621804873
                        2 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\na=msid-semantic:
                        WMS\r\nm=video 9 UDP/TLS/RTP/SAVPF 97\r\nc=IN
                        IP4 0.0.0.0\r\na=rtcp:9 IN IP4 0.0.0.0\r\na=ice-ufrag:
                        /Bk/\r\na=ice-pwd:/r6gCCBrL+VTe5eBnkYbfrIB\r\na=
                        ice-options:trickle\r\na=fingerprint:sha-256
                        BD:9D:45:8F:EA:5D:B9:C3:F8:99:37:17:EF:86:D4:C5:17:
                        FA:3D:3F:C0:ED:D8:B5:30:E5:0D:8D:C2:5C:BD:78\r\na=setup:
                        active\r\na=mid:video0\r\na=recvonly\r\na=rtcp-mux\r\na=
                        rtcp-rsize\r\na=rtpmap:97 H264/90000\r\na=rtcp-fb:97
                        nack pli\r\na=fmtp:97 level-asymmetry-allowed=1;
                        packetization-mode=1;profile-level-id=42e015\r\n'}}
        """
        print("Handling answer message:", msg)
        sdp = msg['sdp']
        assert (sdp['type'] == 'answer')
        sdp = sdp['sdp']
        print('Received answer:\n%s' % sdp)
        res, sdpmsg = GstSdp.SDPMessage.new()
        GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
        answer = GstWebRTC.WebRTCSessionDescription.new(
            GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
        promise = Gst.Promise.new()
        self.webrtc.emit('set-remote-description', answer, promise)
        promise.interrupt()

    def handle_server_message(self, msg):
        """Process data sent via signalling server
        Args:
            msg(str) - message in JSON format
        """
        print("Received from server:", msg)
        msg_type = msg["type"]
        if msg_type == "answer":
            self.handle_answer_message(msg)
        elif msg_type == "new-ice-candidate":
            self.handle_new_ICE_message(msg)
        elif msg_type == "ready" and msg["sender_id"] == self.peer_id:
            self.start_call()
        else:
            print("Sender thread received unknown message:", msg)

    def start_call(self):
        """Initialize RTCPeerConnection with other thread"""
        self.start_pipeline()

    def start_pipeline(self):
        Gst.init(None)
        self.pipe = Gst.parse_launch(PIPELINE_DESC)
        self.webrtc = self.pipe.get_by_name('sendrecv')
        # Map different functions to signals received
        self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
        self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
        # self.webrtc.connect('pad-added', self.on_incoming_stream)
        self.pipe.set_state(Gst.State.PLAYING)

JavaScript Answerer (Receives media)

var myID;
var peerID;
var _peer_list = {};

// TODO: Make this the callee side, e.g.
// 1. Server will send offer when client loads in
// 2. This side will set remote description, generate answer, then answer back

// socketio 
var protocol = window.location.protocol;
var url = protocol + '//' + document.domain + ':' + location.port;
var socket = io(url, {autoConnect: true});

document.addEventListener("DOMContentLoaded", (event)=>{
    // TODO: Don't need a camera on user side, just receive
    // TODO: Maybe load remote tracks here?
});

var camera_allowed=false; 
var mediaConstraints = {
    audio: true,
    video: {
        height: 480,
        width: 640
    }
};

function log_error(e){console.log("[ERROR] ", e);}
function sendViaServer(data){socket.emit("data", data);}

socket.on("connect", ()=>{
    console.log("socket connected....");
//    socket.emit("join-room", {"room_id": "users"});
    socket.emit("user connect");
});

socket.on("ack user connect", (data)=>{
    console.log("ack user connect ", data);
    myID = data["sid"];
    let sid_elem = document.getElementById("SID");
    sid_elem.innerText = myID;
    //let display_name = data["name"];
//    _peer_list[peer_id] = undefined; // add new user to user list
    // create a video HTML element and append to grid
    //addVideoElement(peer_id, display_name);
});

socket.on("register sender id", (data)=>{
    console.log("register sender id", data);
    let peer_id = data["sender_id"];
    peerID = peer_id;

    sendViaServer({
            "sender_id": myID,
            "target_id": peerID,
            "type": "ready"
        });
    console.log("Sent ready signal")
});

function closeConnection(peer_id)
{
    if(peer_id in _peer_list)
    {
        _peer_list[peer_id].onicecandidate = null;
        _peer_list[peer_id].ontrack = null;
        _peer_list[peer_id].onnegotiationneeded = null;

        delete _peer_list[peer_id]; // remove user from user list
    }
}

function log_user_list()
{
    for(let key in _peer_list)
    {
        console.log('${key}: ${_peer_list[key]}');
    }
}

//---------------[ webrtc ]--------------------    

var PC_CONFIG = {iceServers: []};

socket.on("data", (msg)=>{
    switch(msg["type"])
    {
        case "offer":
            handleOfferMsg(msg);
            break;
        case "answer":
            handleAnswerMsg(msg);
            break;
        case "new-ice-candidate":
            handleNewICECandidateMsg(msg);
            break;
    }
});

function start_webrtc()
{
    // send offer to all other members
    for(let peer_id in _peer_list)
    {
        invite(peer_id);
    }
}

const sleep = ms => new Promise(r => setTimeout(r, ms));

function createPeerConnection(peer_id)
{
    _peer_list[peer_id] = new RTCPeerConnection(PC_CONFIG);

    _peer_list[peer_id].onicecandidate = (event) => {handleICECandidateEvent(event, peer_id)};
    _peer_list[peer_id].ontrack = (event) => {handleTrackEvent(event, peer_id)};
    _peer_list[peer_id].onnegotiationneeded = () => {handleNegotiationNeededEvent(peer_id)};
}


function handleNegotiationNeededEvent(peer_id)
{
    // onnegotiationneeded: get local description and send to remote peer
    _peer_list[peer_id].createOffer()
    .then((offer)=>{return _peer_list[peer_id].setLocalDescription(offer);})
    .then(()=>{
        console.log(`sending offer to <${peer_id}> ...`,
            _peer_list[peer_id].localDescription);
        sendViaServer({
            "sender_id": myID,
            "target_id": peer_id,
            "type": "offer",
            "sdp": _peer_list[peer_id].localDescription
        });
    })
    .catch(log_error);
} 

//----------Call Initialization----------//
// Client will likely call only ONE of [handleOfferMsg, handleAnswerMsg]

function handleOfferMsg(msg)
{
    // SDP received; use it to create the peer connection
    // offer specifies the config for the call
    // 'offer' callback
    peer_id = msg['sender_id'];

    console.log(`offer received from <${peer_id}>`, msg);
    createPeerConnection(peer_id);
    let desc = new RTCSessionDescription({type: 'offer',
        sdp: msg['sdp']});
    _peer_list[peer_id].setRemoteDescription(desc)
    .then(()=>{return _peer_list[peer_id].createAnswer();})
    .then((answer)=>{return _peer_list[peer_id].setLocalDescription(answer);})
    .then(()=>{
        console.log(`sending answer to <${peer_id}> ...`,
            _peer_list[peer_id].localDescription);
        sendViaServer({
            "sender_id": myID,
            "target_id": peer_id,
            "type": "answer",
            "sdp": _peer_list[peer_id].localDescription
        });
    })
    .catch(log_error);
}

function handleAnswerMsg(msg)
{
    // received answer; use it to create peer connection
    // answer has less info than offer. just need to show our side of connection
    // 'answer' callback
    peer_id = msg['sender_id'];
    console.log(`answer received from <${peer_id}>`, msg);
    let desc = new RTCSessionDescription(msg['sdp']);
    _peer_list[peer_id].setRemoteDescription(desc);
}


function handleICECandidateEvent(event, peer_id)
{
    console.log("ICE Event:\n", event);

    if (event.candidate != null) {
        console.log("Handling candidate:", event.candidate);
        console.log(event.candidate["candidate"]);
        sendViaServer({
            "sender_id": myID,
            "target_id": peer_id,
            "type": "new-ice-candidate",
            "candidate": event.candidate
        });
    }
}

function handleNewICECandidateMsg(msg)
{
    // 'new-ice-candidate' callback
    console.log(`ICE candidate recieved from <${peer_id}>`, msg);
    var candidate = new RTCIceCandidate({
        candidate: msg["candidate"],
        sdpMLineIndex: msg["mlineindex"],
    });

    _peer_list[msg["sender_id"]].addIceCandidate(candidate)
    .catch(log_error);
}


function handleTrackEvent(event, peer_id)
{
    console.log(`track event recieved from <${peer_id}>`, event);

    if(event.streams)
    {
        document.getElementById("local_vid").srcObject = event.streams[0];
    }
}

function checkConnectionState()
{
    console.log("Checking connections:");
    for (let peer in _peer_list){
        console.log("Peer ID:", peer);
        console.log("State:", _peer_list[peer].connectionState);
        console.log("Local Description:", _peer_list[peer].localDescription);
        console.log("Remote Description:", _peer_list[peer].remoteDescription);
    }
}

var checkStateTimer = setInterval(checkConnectionState, 10000);

chrome://webrtc-internals Screenshot of WebRTC Internals Page on Recipient Side

  • I believe you *always* need a STUN server, but there are lots of public free ones out there. You won't need a *TURN* server if you are on the same network. I've not worked with 1.14 but for reference 1.16 works just fine with Chrome and Firefox (haven't tested others). If you show your code and your list of ICE candidates we may be able to help you find the issue. Also, if you could post a screenshot of your `about:webrtc-internals` (in your browser) that might help. – Christian Fritz Mar 04 '23 at 19:00
  • @ChristianFritz thanks for the input. I have added the sender and recipient code to the question. Based on what I've read [here](https://stackoverflow.com/questions/30742431/webrtc-on-isolated-lan-without-ice-stun-turn-server) I don't believe the STUN is required on the local network. I tried running a more typical WebRTC implementation over my local network and was able to connect without servers specified. My system won't have access to other devices outside the ones running the host and client. If a STUN server really is required, I'd have to spin my own up on the server. – 2wheat3rock Mar 06 '23 at 21:23

0 Answers0