1

I am trying to code a project which I have at least 20 rtsp CCTV URL going to access at same time.

I tried to use ffmpeg to reach out my goal via multiple input method. However, there is a problem.

ffmpeg -i URL_1 -i URL_2 -

The command above is the example I tried. I wish that I can access two rtsps via ffmpeg and output them into two different queues for the future use. If I use this command and read bytes after that, I can not distinguish which bytes belongs to which input rtsp.

Is there any other way which I can access more rtsp at same time?

Edit: Adding Code

import ffmpeg
import numpy as np
import subprocess as sp
import threading
import queue
import time
class CCTVReader(threading.Thread):
    def __init__(self, q, in_stream, name):
        super().__init__()
        self.name = name
        self.q = q
        self.command = ["ffmpeg",
                        "-c:v", "h264",     # Tell ffmpeg that input stream codec is h264
                        "-i", in_stream,    # Read stream from file vid.264
                        "-c:v", "copy",     # Tell ffmpeg to copy the video stream as is (without decding and encoding)
                        "-an", "-sn",       # No audio an no subtites
                        "-f", "h264",       # Define pipe format to be h264
                        "-"]                # Output is a pipe

    def run(self):
        pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3)  # Don't use shell=True (you don't need to execute the command through the shell).

        # while True:
        for i in range(1024*10):  # Read up to 100KBytes for testing
            data = pipe.stdout.read(1024)  # Read data from pip in chunks of 1024 bytes
            self.q.put(data)

            # Break loop if less than 1024 bytes read (not going to work with CCTV, but works with input file)
            if len(data) < 1024:
                break

        try:
            pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
        except sp.TimeoutExpired:
            pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).

        if self.q.empty():
            print("There is a problem (queue is empty)!!!")
        else:
            # Write data from queue to file vid_from_queue.264 (for testingg)
            with open(self.name+".h264", "wb") as queue_save_file:
                while not self.q.empty():
                    queue_save_file.write(self.q.get())


# Build synthetic video, for testing begins:
################################################
# width, height = 1280, 720
# in_stream = "vid.264"
# sp.Popen("ffmpeg -y -f lavfi -i testsrc=size=1280x720:duration=5:rate=1 -c:v libx264 -crf 23 -pix_fmt yuv420p " + in_stream).wait()
################################################

#Use public RTSP Streaming for testing
readers = {}
queues = {}
dict = {
        "name1":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name2":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name3":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name4":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name5":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name6":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name7":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name8":{"ip":"rtsp://xxx.xxx.xxx.xxx/",
        "name9":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name10":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name11":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name12":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name13":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name14":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name15":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        }

for key in dict:
    ip = dict[key]["ip"]
    name = key
    q = queue.Queue()
    queues[name] = q
    cctv_reader = CCTVReader(q, ip, name)
    readers[name] = cctv_reader
    cctv_reader.start()
    cctv_reader.join()
HCCY
  • 91
  • 1
  • 9

1 Answers1

1

You already have all the infrastructure in your previous question.

All you need to do is creating multiple objects of your 'CCTVReader' class.

Here is a working code sample, reading two streams:

import numpy as np
import subprocess as sp
import threading
import queue
import time

class CCTVReader(threading.Thread):
    def __init__(self, q, in_stream, chunk_size):
        super().__init__()
        self.q = q
        self.chunk_size = chunk_size
        self.command = ["ffmpeg",
                        "-c:v", "h264",     # Tell FFmpeg that input stream codec is h264
                        "-i", in_stream,    # Read stream from file vid.264
                        "-c:v", "copy",     # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
                        "-an", "-sn",       # No audio an no subtitles
                        "-f", "h264",       # Define pipe format to be h264
                        "-"]                # Output is a pipe

    def run(self):
        pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3)  # Don't use shell=True (you don't need to execute the command through the shell).

        # while True:
        for i in range(100):  # Read up to 100KBytes for testing
            data = pipe.stdout.read(self.chunk_size)  # Read data from pip in chunks of self.chunk_size bytes
            self.q.put(data)

            # Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
            if len(data) < self.chunk_size:
                break

        try:
            pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
        except sp.TimeoutExpired:
            pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).



#in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",

#Use public RTSP Streaming for testing
in_stream1 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"

in_stream2 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"


q1 = queue.Queue()
q2 = queue.Queue()

cctv_reader1 = CCTVReader(q1, in_stream1, 1024)  # First stream 
cctv_reader2 = CCTVReader(q2, in_stream2, 2048)  # Second stream

cctv_reader1.start()
time.sleep(5) # Wait 5 seconds (for testing).
cctv_reader2.start()

cctv_reader1.join()
cctv_reader2.join()

if q1.empty():
    print("There is a problem (q1 is empty)!!!")
else:
    # Write data from queue to file vid_from_queue1.264 (for testing)
    with open("vid_from_q1.264", "wb") as queue_save_file:
        while not q1.empty():
            queue_save_file.write(q1.get())

if q2.empty():
    print("There is a problem (q2 is empty)!!!")
else:
    # Write data from queue to file vid_from_queue2.264 (for testing)
    with open("vid_from_q2.264", "wb") as queue_save_file:
        while not q2.empty():
            queue_save_file.write(q2.get())

Update:

I don't think you can use a syntax like ffmpeg -i URL_1 -i URL_2 -...

The code you have posted has few issues:

  1. The cctv_reader.join() must be in a second loop, because it waits for the thread to end, and blocks execution.
  2. Saving the data to files should be after all threads ended (it's just for testing).
    In case you want to record the data, try saving each chunk right after grabbing it.
  3. Reduce size of bufsize=1024**3, try bufsize=1024**2*100.
    In case OS actually allocate buffer size of 1GB per process, you might get out of memory.

Note: Python multi-threading performance is not so good, check the CPU load.

Here is a code sample (reading from files):

import numpy as np
import subprocess as sp
import threading
import queue

class CCTVReader(threading.Thread):
    def __init__(self, q, in_stream, chunk_size):
        super().__init__()
        self.q = q
        self.chunk_size = chunk_size
        self.command = ["ffmpeg",
                        "-c:v", "h264",     # Tell FFmpeg that input stream codec is h264
                        "-i", in_stream,    # Read stream from file vid.264
                        "-c:v", "copy",     # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
                        "-an", "-sn",       # No audio an no subtitles
                        "-f", "h264",       # Define pipe format to be h264
                        "-"]                # Output is a pipe

    def run(self):
        pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**2*100)

        # while True:
        for i in range(100):  # Read up to 100KBytes for testing
            data = pipe.stdout.read(self.chunk_size)  # Read data from pip in chunks of self.chunk_size bytes
            self.q.put(data)

            # Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
            if len(data) < self.chunk_size:
                break

        try:
            pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
        except sp.TimeoutExpired:
            pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).


    def save_q_to_file(self, vid_file_name):
        # Write data from queue to file (for testing)
        if self.q.empty():
            print("There is a problem (q is empty)!!!")
        else:            
            with open(vid_file_name, "wb") as queue_save_file:
                while not self.q.empty():
                    queue_save_file.write(self.q.get())

#in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",

#Use public RTSP Streaming for testing
#in_stream = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"

#Use public RTSP Streaming for testing
readers = {}
queues = {}

# Read from file (for tesing)
dict = {
        "name1":{ "ip":"vid1.264",  "fname":"vid_from_q1.264"},
        "name2":{ "ip":"vid2.264",  "fname":"vid_from_q2.264"},
        "name3":{ "ip":"vid3.264",  "fname":"vid_from_q3.264"},
        "name4":{ "ip":"vid4.264",  "fname":"vid_from_q4.264"},
        "name5":{ "ip":"vid5.264",  "fname":"vid_from_q5.264"},
        "name6":{ "ip":"vid6.264",  "fname":"vid_from_q6.264"},
        "name7":{ "ip":"vid7.264",  "fname":"vid_from_q7.264"},
        "name8":{ "ip":"vid8.264",  "fname":"vid_from_q8.264"},
        "name9":{ "ip":"vid9.264",  "fname":"vid_from_q9.264"},
        "name10":{"ip":"vid10.264", "fname":"vid_from_q10.264"},
        "name11":{"ip":"vid11.264", "fname":"vid_from_q11.264"},
        "name12":{"ip":"vid12.264", "fname":"vid_from_q12.264"},
        "name13":{"ip":"vid13.264", "fname":"vid_from_q13.264"},
        "name14":{"ip":"vid14.264", "fname":"vid_from_q14.264"},
        "name15":{"ip":"vid15.264", "fname":"vid_from_q15.264"}
        }

for key in dict:
    ip = dict[key]["ip"]
    name = key
    q = queue.Queue()
    queues[name] = q
    cctv_reader = CCTVReader(q, ip, 8192)
    readers[name] = cctv_reader
    cctv_reader.start()

# Wait for all threads to end
for key in readers:
    readers[key].join()

# Save data for testing
for key in readers:
    fine_name = dict[key]["fname"]
    readers[key].save_q_to_file(fine_name)
Rotem
  • 30,366
  • 4
  • 32
  • 65
  • Hello Rotem, thank you for taking time to help me again. I tried to use multi-thread to query data before actually. However, after I tried to put 15 threads to query 15 different CCTV, there is a serious data loss happen. I put the code I tried before above in the question space. I will try what you provide to test again. Thank you very much. – HCCY Feb 08 '20 at 02:43
  • I don't think you can use a syntax like `ffmpeg -i URL_1 -i URL_2 -`..., Your code has few issues: **1.** The `cctv_reader.join()` must be in a second loop, because it waits for the thread to end, and blocks execution. **2.** Saving the data to files should be after all threads ended (it's just for testing). In case you want to record the data, try saving each chunk right after grabbing it. **3.** Reduce size of `bufsize=1024**3`, try `bufsize=1024**2*100`. **4.** Python multi-threading performance is not so good, check the CPU load. – Rotem Feb 08 '20 at 09:04
  • I have updated my most. I have no way for testing it with 15 CCTV cameras. I used video files instead. – Rotem Feb 08 '20 at 09:48
  • Rotem, one little question about the thread issue you mention. If I change the multi-threading by using GPU threading, will it preform better? – HCCY Feb 11 '20 at 01:19
  • I don't think using GPU threading fits your case. The thread reads data from PIPE, and store in a queue, and GPUs are not built for that kind of tasks. Read [this post](https://stackoverflow.com/questions/44793371/is-multithreading-in-python-a-myth), to learn mote about Python multi-threading issues. I don't think the task is a CPU heavy task, and it can be done is a single core. You can increase chunk size, for reducing the number of threads context switching. – Rotem Feb 11 '20 at 06:38