0

I've written a downloader in scrapy which downloads multiple ts Files asynchronously. Now I'm trying to concatenate and convert to mp4. I've written the following code but the video order doesnt seem to be correct and the mp4 file is completly messed up and not watchable.

 import ffmpeg
    
        if all_files_downloaded:
                   
                        
            BASE_URL=f"/test{i}"
            process = ffmpeg.input('pipe:').output(f'{BASE_URL}/video.mp4', vcodec='h264_nvenc').overwrite_output().run_async(pipe_stdin=True)
                    
                    
            for snippets in sorted(self.parallelviddeos.get(id)):
                   self.parallelviddeos.get(id)[snippets].seek(0)
                   process.stdin.write(self.parallelviddeos.get(id)[snippets].read())
                    
            process.stdin.close()
            process.wait() 
    
     
                    

The dictionary 'parallelviddeos' which has the id as a key and a list of ByteIO objects (the *.ts files) as an input for the corresponding video does look like this

parallelviddeos={id1:{0:BytesIO,2:BytesIO,3:BytesIO,4:BytesIO},id2:{0:BytesIO,...}}

Can somebody help me with the transformation from the BytesIO-Part to the complete mp4-file. I'm searching for a pipeline based method.

Im'getting the ts-Files as a response from a request in byte format.

scuba14
  • 33
  • 9
  • 1
    *"The video order doesn't seem to be correct and the mp4 file is completely messed up and not watchable"*, Can you be more specific? Is it just the order of the different parts? Why are you using dictionaries instead of using lists? Can we conclude the correct parts order from your post? – Rotem Jan 19 '22 at 17:54
  • @Rotem Watching it kinda feels like a buffer overflow. The order is not correct and sometimes the end of the clip is played several times (but not in a row there are other clips in between). I would not even say buffer overflow more of an wrong concat-point. – scuba14 Jan 19 '22 at 18:06
  • @Rotem I am using dicts because every video has its own id. The second dictionary is necessary because of the multiple threads. The response from the downloader is not in the correct order so I need BytesIO-objects with a parameter which defines the position of the clip in the video. Im storing them in a dict and when the last clip is stored I'm sorting the dict and write it in the correct order into the pipe. I think there are multiple problems because of the wrong order and the "buffer-overflow". The position of the video is the key as you can see in the 'parallelviddeos'-sample above – scuba14 Jan 19 '22 at 18:12

1 Answers1

1

We probably don't suppose to write TS files to the pipe in such a way.
Without the PIPE, we may use one of the concat options.

I think the safest option is using another FFmpeg sub-process for decoding the TS into raw video frames, and writing the pipe frame by frame.

You may start with my following post, and execute it in a loop.


Here is a "self contained" code sampe:

import ffmpeg
import numpy as np
import cv2 # Use OpenCV for testing
import io
import subprocess as sp
import threading
from functools import partial


out_filename = 'video.mp4'

# Build synthetic input, and read into BytesIO
###############################################
# Assume we know the width and height from advance
# (In case you don't know the resolution, I posted solution for getting it using FFprobe).
width = 192
height = 108
fps = 10
n_frames = 100

in_filename1 = 'in1.ts'
in_filename2 = 'in2.ts'


# Build synthetic video (in1.ts) for testing:
(
    ffmpeg
    .input(f'testsrc=size={width}x{height}:rate=1:duration={n_frames}', f='lavfi', r=fps)
    .filter('setpts', f'N/{fps}/TB')
    .output(in_filename1, vcodec='libx264', crf=17, pix_fmt='yuv420p', loglevel='error')
    .global_args('-hide_banner')
    .overwrite_output()
    .run()
)

# Build synthetic video (in1.ts) for testing:
(
    ffmpeg
    .input(f'mandelbrot=size={width}x{height}:rate=1', f='lavfi', r=fps)
    .filter('setpts', f'N/{fps}/TB')
    .output(in_filename2, vcodec='libx264', crf=17, pix_fmt='yuv420p', loglevel='error', t=n_frames)
    .global_args('-hide_banner')
    .overwrite_output()
    .run()
)


    
# Read the file into in-memory binary streams
with open(in_filename1, 'rb') as f:
    in_bytes = f.read()
    stream1 = io.BytesIO(in_bytes)

# Read the file into in-memory binary streams
with open(in_filename2, 'rb') as f:
    in_bytes = f.read()
    stream2 = io.BytesIO(in_bytes)

# Use list instead of dictionary (just for the example).
in_memory_viddeos = [stream1, stream2]
###############################################



# Writer thread: Write to stdin in chunks of 1024 bytes
def writer(decoder_process, stream):
    for chunk in iter(partial(stream.read, 1024), b''):
        decoder_process.stdin.write(chunk)
    decoder_process.stdin.close()



def decode_in_memory_and_re_encode(vid_bytesio):
    """ Decode video in BytesIO, and write the decoded frame into the encoder sub-process """
    vid_bytesio.seek(0)

    # Execute video decoder sub-process
    decoder_process = (
        ffmpeg
        .input('pipe:') #, f='mpegts', vcodec='h264')
        .video
        .output('pipe:', format='rawvideo', pix_fmt='bgr24')
        .run_async(pipe_stdin=True, pipe_stdout=True)
    )

    thread = threading.Thread(target=writer, args=(decoder_process, vid_bytesio))
    thread.start()

    # Read decoded video (frame by frame), and display each frame (using cv2.imshow for testing)
    while True:
        # Read raw video frame from stdout as bytes array.
        in_bytes = decoder_process.stdout.read(width * height * 3)

        if not in_bytes:
            break

        # Write the decoded frame to the encoder.
        encoder_process.stdin.write(in_bytes)

        # transform the byte read into a numpy array (for testing)
        in_frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])

        # Display the frame (for testing)
        cv2.imshow('in_frame', in_frame)
        cv2.waitKey(10)

    thread.join()
    decoder_process.wait()


# Execute video encoder sub-process
encoder_process = (
    ffmpeg
    .input('pipe:', r=fps, f='rawvideo', s=f'{width}x{height}', pixel_format='bgr24')
    .video
    .output(out_filename, vcodec='libx264', crf=17, pix_fmt='yuv420p')
    .overwrite_output()
    .run_async(pipe_stdin=True)
)


# Re-encode the "in memory" videos in a loop
for memvid in in_memory_viddeos:
    decode_in_memory_and_re_encode(memvid)


encoder_process.stdin.close()
encoder_process.wait()

cv2.destroyAllWindows()

Sorry for ignoring your dictionary structure.
I assume the issue is related to video encoding, and not to the way you are iterating the BytesIO objects.


Update:

Reading the width and height from in-memory video stream:

There are few options.
I decided to read the width and height from the header of a BMP image.

Start FFmpeg sub-process with the following arguments (image2pipe format, and bmp video codec):

decoder_process = ffmpeg.input('pipe:'), f='mpegts', vcodec='h264').video.output('pipe:',f='image2pipe', vcodec='bmp').run_async(pipe_stdin=True, pipe_stdout=True)

Get the with and height by parsing the header:

in_bytes = decoder_process.stdout.read(54) # 54 bytes BMP Header + 4 bytes
(width, height) = struct.unpack("<ll", in_bytes[18:26])

Complete code sample:

import ffmpeg
import numpy as np
import cv2 # Use OpenCV for testing
import io
import subprocess as sp
import threading
from functools import partial
import struct


out_filename = 'video.mp4'

# Build synthetic input, and read into BytesIO
###############################################
# Assume we know the width and height from advance
width = 192
height = 108
fps = 10
n_frames = 100

in_filename1 = 'in1.ts'
in_filename2 = 'in2.ts'


## Build synthetic video (in1.ts) for testing:
#(
#    ffmpeg
#    .input(f'testsrc=size={width}x{height}:rate=1:duration={n_frames}', f='lavfi', r=fps)
#    .filter('setpts', f'N/{fps}/TB')
#    .output(in_filename1, vcodec='libx264', crf=17, pix_fmt='yuv420p', loglevel='error')
#    .global_args('-hide_banner')
#    .overwrite_output()
#    .run()
#)

## Build synthetic video (in1.ts) for testing:
#(
#    ffmpeg
#    .input(f'mandelbrot=size={width}x{height}:rate=1', f='lavfi', r=fps)
#    .filter('setpts', f'N/{fps}/TB')
#    .output(in_filename2, vcodec='libx264', crf=17, pix_fmt='yuv420p', loglevel='error', t=n_frames)
#    .global_args('-hide_banner')
#    .overwrite_output()
#    .run()
#)


    
# Read the file into in-memory binary streams
with open(in_filename1, 'rb') as f:
    in_bytes = f.read()
    stream1 = io.BytesIO(in_bytes)

# Read the file into in-memory binary streams
with open(in_filename2, 'rb') as f:
    in_bytes = f.read()
    stream2 = io.BytesIO(in_bytes)

# Use list instead of dictionary (just for the example).
in_memory_viddeos = [stream1, stream2]
###############################################



# Writer thread: Write to stdin in chunks of 1024 bytes
def writer(decoder_process, stream):
    for chunk in iter(partial(stream.read, 1024), b''):
        try:
            decoder_process.stdin.write(chunk)
        except (BrokenPipeError, OSError):
            # get_in_memory_video_frame_size causes BrokenPipeError exception and OSError exception.
            # This in not a clean solution, but it's the simplest I could find.
            return           

    decoder_process.stdin.close()



def get_in_memory_video_frame_size(vid_bytesio):
    """ Get the resolution of a video in BytesIO """
    vid_bytesio.seek(0)
    
    # Execute video decoder sub-process, the output format is BMP
    decoder_process = (
        ffmpeg
        .input('pipe:') #, f='mpegts', vcodec='h264')
        .video
        .output('pipe:', f='image2pipe', vcodec='bmp')
        .run_async(pipe_stdin=True, pipe_stdout=True)
    )

    thread = threading.Thread(target=writer, args=(decoder_process, vid_bytesio))
    thread.start()

    # Read raw video frame from stdout as bytes array.
    # https://en.wikipedia.org/wiki/BMP_file_format
    in_bytes = decoder_process.stdout.read(54) # 54 bytes BMP Header + 4 bytes

    decoder_process.stdout.close()
    thread.join()
    decoder_process.wait()
    vid_bytesio.seek(0)

    # The width and height are located in bytes 18 to 25 (4 bytes each)
    (width, height) = struct.unpack("<ll", in_bytes[18:26])
     
    return width, abs(height)



def decode_in_memory_and_re_encode(vid_bytesio):
    """ Decode video in BytesIO, and write the decoded frame into the encoder sub-process """
    vid_bytesio.seek(0)

    # Execute video decoder sub-process
    decoder_process = (
        ffmpeg
        .input('pipe:') #, f='mpegts', vcodec='h264')
        .video
        .output('pipe:', format='rawvideo', pix_fmt='bgr24')
        .run_async(pipe_stdin=True, pipe_stdout=True)
    )

    thread = threading.Thread(target=writer, args=(decoder_process, vid_bytesio))
    thread.start()

    # Read decoded video (frame by frame), and display each frame (using cv2.imshow for testing)
    while True:
        # Read raw video frame from stdout as bytes array.
        in_bytes = decoder_process.stdout.read(width * height * 3)

        if not in_bytes:
            break

        # Write the decoded frame to the encoder.
        encoder_process.stdin.write(in_bytes)

        # transform the byte read into a numpy array (for testing)
        in_frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])

        # Display the frame (for testing)
        cv2.imshow('in_frame', in_frame)
        cv2.waitKey(10)

    thread.join()
    decoder_process.wait()



width, height = get_in_memory_video_frame_size(in_memory_viddeos[0])


# Execute video encoder sub-process
encoder_process = (
    ffmpeg
    .input('pipe:', r=fps, f='rawvideo', s=f'{width}x{height}', pixel_format='bgr24')
    .video
    .output(out_filename, vcodec='libx264', crf=17, pix_fmt='yuv420p')
    .overwrite_output()
    .run_async(pipe_stdin=True)
)


# Re-encode the "in memory" videos in a loop
for memvid in in_memory_viddeos:
    decode_in_memory_and_re_encode(memvid)


encoder_process.stdin.close()
encoder_process.wait()

cv2.destroyAllWindows()
Rotem
  • 30,366
  • 4
  • 32
  • 65
  • Why do I have to use Threading to write the video isnide the pipe. I tried to to substitute the lines `thread = threading.Thread(target=writer, args=(decoder_process, vid_bytesio)) thread.start()` with the following: `writer(decoder_process,vid_bytesio)`. I'm thinking of writing the whole video into the stdin pipeline and then process it in one step without the while loop. Is it because you are writing in 1024 byte chunks? – scuba14 Jan 21 '22 at 01:03
  • Do I need to know the resolution of the video? – scuba14 Jan 21 '22 at 02:02
  • **1.** Writing in small chunks is important when the video buffer is large. Writing too many bytes at once, without "draining the pipe" causes the execution to hung (stuck). **2.** I added an example for reading the resolution. – Rotem Jan 21 '22 at 11:05