I have been following examples here to use Flask as a signalling server. I can run this successfully if I have the server and both clients running on the same computer (due to getUserMedia requiring either a secure connection or running on localhost).
The next step is to connect a gstreamer pipeline with webrtcbin
that produces a video stream rather than using the camera feed. A second client (python) starts and sends this stream. I have used this example as a reference for managing webrtcbin through Python. I am currently using gstreamer-plugins-bad v1.14.5. I have been able to:
- Generate an offer, set its local description, and send to the client (js) via a signalling server.
- Accept client answer and set remote description.
- Generate and send ICE candidates, receive and set remote ICE candidates.
All of this is being done on a local network. From what I've read, I should not need a STUN server for this implementation.
When checking the connection status, the connection status is always stuck in connecting
.
My question is, is gstreamer 1.14.5 too outdated to work with more recent browsers? Or is this potentially some other implementation issue? I've explored all other fixes I can think of. I don't want to update gstreamer unless absolutely necessary, as compiling a newer version would be nontrivial for the device I'm working with.
Python Offerer (Sends media stream)
import random
import ssl
import socketio
# Removing websockets implementation in favor of socketio
#import websockets
import asyncio
import os
import sys
import json
import argparse
import time
import threading
from flask import Flask, render_template, request, session
from flask_socketio import SocketIO, emit, join_room
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
PIPELINE_DESC = '''
videotestsrc is-live=true pattern=ball ! videoconvert ! omxh264enc !
rtph264pay ! application/x-rtp,media=video,encoding-name=H264,payload=97 !
webrtcbin name=sendrecv
'''
# Use this if h264 not supported
# PIPELINE_DESC = '''
# videotestsrc is-live=true pattern=ball ! videoconvert ! vp8enc !
# rtpvp8pay ! application/x-rtp,media=video,encoding-name=VP8,payload=97 !
# webrtcbin name=sendrecv
# '''
class WebRTCSenderThread:
def __init__(self, peer_id, port):
# ID of this thread
self.id = None
# User ID to send to
self.peer_id = peer_id
# IP of signalling server
# Hard-coding this bc the main server should always make these threads
self.server = "localhost"
self.port = str(port)
self.conn = None
self.pipe = None
self.webrtc = None
self.sio = None
self.connect_socketio()
def connect_socketio(self):
"""Connect to the server and send first message
Initialize self.sio
"""
print("Connecting...")
self.sio = socketio.Client()
url = "ws://" + str(self.server) + ":" + str(self.port)
self.sio.connect(url)
print("Connected.")
self.bind_events()
self.sio.emit('sender connect')
def shutdown(self):
sys.exit(0)
def bind_events(self):
"""Bind events to class methods."""
# Bind event callbacks
self.sio.on("ack sender connect", self.handle_ack_server_connect)
self.sio.on("data", self.handle_server_message)
def register_sid(self, sid):
"""Update session ID
Args:
sid(str) - unique session ID string for this thread
"""
print("Registering sender sid:", sid)
self.id = sid
def handle_ack_server_connect(self, msg):
"""Process server acknowledgement of socket connection.
Args:
msg(str) - message containing unique session ID for this process
"""
print(msg)
sid = msg["sid"]
self.register_sid(sid)
self.sio.emit("register sender to user", (sid, self.peer_id))
def send_via_server(self, data):
"""Send data to other peer using the signalling server
Args:
data(str) - Message in JSON format
"""
self.sio.emit("data", data)
def on_negotiation_needed(self, element):
# Function on_offer_created will be run when promise is ready
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
element.emit('create-offer', None, promise)
def on_offer_created(self, promise, _, __):
"""Use new offer to set local descriptions to point gstreamer pipeline
Calls function to send session description to server
Args:
promise: object representing eventual completion or failure of async
operation (in this case, the getting the offer reply)
_: WebRTCBin object
__: None
Returns:
"""
promise.wait()
reply = promise.get_reply()
print("reply obj:", reply.to_string())
# offer = reply['offer']
offer = reply.get_value('offer')
promise = Gst.Promise.new()
# Tell the pipeline to set local description based on the sdp offer
self.webrtc.emit('set-local-description', offer, promise)
# Wake up anythin waiting for this promise
promise.interrupt()
# Make server aware of sdp ready and give it the connection info
self.send_sdp_offer(offer)
def send_sdp_offer(self, offer):
"""Send sdp to server to join peer"""
# Get session description protocol as string
text = offer.sdp.as_text()
print ('Sending offer:\n%s' % text)
# Combine with session type to create valid message
msg = {
'sender_id': self.id,
'target_id': self.peer_id,
'type': 'offer',
'sdp': text}
# Create separate loop to manage the message sending
self.send_via_server(msg)
def send_ice_candidate_message(self, _, mlineindex, candidate):
"""
Def ICE: Interactive Connectivity Establishment - config to establish
RTCPeerConnection (usually this is a collection of IP's)
Args:
_: GstWebRTCBin object
mlineindex:
candidate:
Returns:
"""
msg = {
"sender_id": self.id,
"target_id": self.peer_id,
"type": "new-ice-candidate",
"candidate": candidate,
"mlineindex": mlineindex}
self.send_via_server(msg)
def handle_new_ICE_message(self, msg):
"""Process ICE candidate to connect to remote client
Args:
msg(str) - JSON string containing ICE candidate
ex. {'sender_id': '7b6859d839be4e84b9a43d7e420fc76f',
'target_id': '22702c883cee45af90bdf2eecc941dc5',
'type': 'new-ice-candidate',
'candidate':
{'candidate': 'candidate:827342168 1 udp 2113937151
6dc6d2ab-a49e-454d-8ba1-500570cbb2c6.local 39719 typ
host generation 0 ufrag Ub5h network-cost 999',
'sdpMid': 'video0',
'sdpMLineIndex': 0}}
"""
ice = msg['candidate']
candidate = ice['candidate']
print("CANDIDATE:\n", candidate)
sdpmlineindex = ice['sdpMLineIndex']
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
def handle_answer_message(self, msg):
"""Process SDP answer to set remote description
Args:
msg(str): SDP in JSON format
ex. {'sender_id': 'c9a38256a2654d8fb989faa4666f4d30',
'target_id': '12468a8b89f24cc1a21a48d724adf669',
'type': 'answer',
'sdp':
{'type': 'answer', 'sdp': 'v=0\r\no=- 37057614621804873
2 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\na=msid-semantic:
WMS\r\nm=video 9 UDP/TLS/RTP/SAVPF 97\r\nc=IN
IP4 0.0.0.0\r\na=rtcp:9 IN IP4 0.0.0.0\r\na=ice-ufrag:
/Bk/\r\na=ice-pwd:/r6gCCBrL+VTe5eBnkYbfrIB\r\na=
ice-options:trickle\r\na=fingerprint:sha-256
BD:9D:45:8F:EA:5D:B9:C3:F8:99:37:17:EF:86:D4:C5:17:
FA:3D:3F:C0:ED:D8:B5:30:E5:0D:8D:C2:5C:BD:78\r\na=setup:
active\r\na=mid:video0\r\na=recvonly\r\na=rtcp-mux\r\na=
rtcp-rsize\r\na=rtpmap:97 H264/90000\r\na=rtcp-fb:97
nack pli\r\na=fmtp:97 level-asymmetry-allowed=1;
packetization-mode=1;profile-level-id=42e015\r\n'}}
"""
print("Handling answer message:", msg)
sdp = msg['sdp']
assert (sdp['type'] == 'answer')
sdp = sdp['sdp']
print('Received answer:\n%s' % sdp)
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(
GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
promise = Gst.Promise.new()
self.webrtc.emit('set-remote-description', answer, promise)
promise.interrupt()
def handle_server_message(self, msg):
"""Process data sent via signalling server
Args:
msg(str) - message in JSON format
"""
print("Received from server:", msg)
msg_type = msg["type"]
if msg_type == "answer":
self.handle_answer_message(msg)
elif msg_type == "new-ice-candidate":
self.handle_new_ICE_message(msg)
elif msg_type == "ready" and msg["sender_id"] == self.peer_id:
self.start_call()
else:
print("Sender thread received unknown message:", msg)
def start_call(self):
"""Initialize RTCPeerConnection with other thread"""
self.start_pipeline()
def start_pipeline(self):
Gst.init(None)
self.pipe = Gst.parse_launch(PIPELINE_DESC)
self.webrtc = self.pipe.get_by_name('sendrecv')
# Map different functions to signals received
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
# self.webrtc.connect('pad-added', self.on_incoming_stream)
self.pipe.set_state(Gst.State.PLAYING)
JavaScript Answerer (Receives media)
var myID;
var peerID;
var _peer_list = {};
// TODO: Make this the callee side, e.g.
// 1. Server will send offer when client loads in
// 2. This side will set remote description, generate answer, then answer back
// socketio
var protocol = window.location.protocol;
var url = protocol + '//' + document.domain + ':' + location.port;
var socket = io(url, {autoConnect: true});
document.addEventListener("DOMContentLoaded", (event)=>{
// TODO: Don't need a camera on user side, just receive
// TODO: Maybe load remote tracks here?
});
var camera_allowed=false;
var mediaConstraints = {
audio: true,
video: {
height: 480,
width: 640
}
};
function log_error(e){console.log("[ERROR] ", e);}
function sendViaServer(data){socket.emit("data", data);}
socket.on("connect", ()=>{
console.log("socket connected....");
// socket.emit("join-room", {"room_id": "users"});
socket.emit("user connect");
});
socket.on("ack user connect", (data)=>{
console.log("ack user connect ", data);
myID = data["sid"];
let sid_elem = document.getElementById("SID");
sid_elem.innerText = myID;
//let display_name = data["name"];
// _peer_list[peer_id] = undefined; // add new user to user list
// create a video HTML element and append to grid
//addVideoElement(peer_id, display_name);
});
socket.on("register sender id", (data)=>{
console.log("register sender id", data);
let peer_id = data["sender_id"];
peerID = peer_id;
sendViaServer({
"sender_id": myID,
"target_id": peerID,
"type": "ready"
});
console.log("Sent ready signal")
});
function closeConnection(peer_id)
{
if(peer_id in _peer_list)
{
_peer_list[peer_id].onicecandidate = null;
_peer_list[peer_id].ontrack = null;
_peer_list[peer_id].onnegotiationneeded = null;
delete _peer_list[peer_id]; // remove user from user list
}
}
function log_user_list()
{
for(let key in _peer_list)
{
console.log('${key}: ${_peer_list[key]}');
}
}
//---------------[ webrtc ]--------------------
var PC_CONFIG = {iceServers: []};
socket.on("data", (msg)=>{
switch(msg["type"])
{
case "offer":
handleOfferMsg(msg);
break;
case "answer":
handleAnswerMsg(msg);
break;
case "new-ice-candidate":
handleNewICECandidateMsg(msg);
break;
}
});
function start_webrtc()
{
// send offer to all other members
for(let peer_id in _peer_list)
{
invite(peer_id);
}
}
const sleep = ms => new Promise(r => setTimeout(r, ms));
function createPeerConnection(peer_id)
{
_peer_list[peer_id] = new RTCPeerConnection(PC_CONFIG);
_peer_list[peer_id].onicecandidate = (event) => {handleICECandidateEvent(event, peer_id)};
_peer_list[peer_id].ontrack = (event) => {handleTrackEvent(event, peer_id)};
_peer_list[peer_id].onnegotiationneeded = () => {handleNegotiationNeededEvent(peer_id)};
}
function handleNegotiationNeededEvent(peer_id)
{
// onnegotiationneeded: get local description and send to remote peer
_peer_list[peer_id].createOffer()
.then((offer)=>{return _peer_list[peer_id].setLocalDescription(offer);})
.then(()=>{
console.log(`sending offer to <${peer_id}> ...`,
_peer_list[peer_id].localDescription);
sendViaServer({
"sender_id": myID,
"target_id": peer_id,
"type": "offer",
"sdp": _peer_list[peer_id].localDescription
});
})
.catch(log_error);
}
//----------Call Initialization----------//
// Client will likely call only ONE of [handleOfferMsg, handleAnswerMsg]
function handleOfferMsg(msg)
{
// SDP received; use it to create the peer connection
// offer specifies the config for the call
// 'offer' callback
peer_id = msg['sender_id'];
console.log(`offer received from <${peer_id}>`, msg);
createPeerConnection(peer_id);
let desc = new RTCSessionDescription({type: 'offer',
sdp: msg['sdp']});
_peer_list[peer_id].setRemoteDescription(desc)
.then(()=>{return _peer_list[peer_id].createAnswer();})
.then((answer)=>{return _peer_list[peer_id].setLocalDescription(answer);})
.then(()=>{
console.log(`sending answer to <${peer_id}> ...`,
_peer_list[peer_id].localDescription);
sendViaServer({
"sender_id": myID,
"target_id": peer_id,
"type": "answer",
"sdp": _peer_list[peer_id].localDescription
});
})
.catch(log_error);
}
function handleAnswerMsg(msg)
{
// received answer; use it to create peer connection
// answer has less info than offer. just need to show our side of connection
// 'answer' callback
peer_id = msg['sender_id'];
console.log(`answer received from <${peer_id}>`, msg);
let desc = new RTCSessionDescription(msg['sdp']);
_peer_list[peer_id].setRemoteDescription(desc);
}
function handleICECandidateEvent(event, peer_id)
{
console.log("ICE Event:\n", event);
if (event.candidate != null) {
console.log("Handling candidate:", event.candidate);
console.log(event.candidate["candidate"]);
sendViaServer({
"sender_id": myID,
"target_id": peer_id,
"type": "new-ice-candidate",
"candidate": event.candidate
});
}
}
function handleNewICECandidateMsg(msg)
{
// 'new-ice-candidate' callback
console.log(`ICE candidate recieved from <${peer_id}>`, msg);
var candidate = new RTCIceCandidate({
candidate: msg["candidate"],
sdpMLineIndex: msg["mlineindex"],
});
_peer_list[msg["sender_id"]].addIceCandidate(candidate)
.catch(log_error);
}
function handleTrackEvent(event, peer_id)
{
console.log(`track event recieved from <${peer_id}>`, event);
if(event.streams)
{
document.getElementById("local_vid").srcObject = event.streams[0];
}
}
function checkConnectionState()
{
console.log("Checking connections:");
for (let peer in _peer_list){
console.log("Peer ID:", peer);
console.log("State:", _peer_list[peer].connectionState);
console.log("Local Description:", _peer_list[peer].localDescription);
console.log("Remote Description:", _peer_list[peer].remoteDescription);
}
}
var checkStateTimer = setInterval(checkConnectionState, 10000);