1

I want to convert ogg byte array/bytes with Opus codec to wav byte array/bytes without saving to disk. I have downloaded audio from telegram api and it is in byte array format with .ogg extension. I do not want to save it to filesystem to eliminate filesystem io latencey.

Currently what I am doing is after saving the audio file in .ogg format using code the below code using telegram api for reference https://docs.python-telegram-bot.org/en/stable/telegram.file.html#telegram.File.download_to_drive

# listen for audio messages
async def audio(update, context):
    newFile = await context.bot.get_file(update.message.voice.file_id)
    await newFile.download_to_drive(output_path)

I am using the code

subprocess.call(["ffmpeg", "-i", output_path, output_path.replace(".ogg", ".wav"), '-y'], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)

to convert ogg file to wav file. But this is not what I want.

I want the code

async def audio(update, context):
    newFile = await context.bot.get_file(update.message.voice.file_id)
    byte_array = await newFile.download_as_bytearray()

to get byte_array and now I want this byte_array to be converted to wav without saving to disk and without using ffmpeg. Let me know in comments if something is unclear. Thanks!

Note: I have setted up a telegram bot at the backend which listens for audios sent to private chat which I do manually for testing purposes.

1 Answers1

0

We may write the OGG data to FFmpeg stdin pipe, and read the encoded WAV data from FFmpeg stdout pipe.
My following answer describes how to do it with video, and we may apply the same solution to audio.

The example assumes that the OGG data is already downloaded and stored in bytes array (in the RAM).


Piping architecture:

 --------------------  Encoded      ---------  Encoded      ------------
| Input OGG encoded  | OGG data    | FFmpeg  | WAV data    | Store to   |
| stream             | ----------> | process | ----------> | BytesIO    |
 --------------------  stdin PIPE   ---------  stdout PIPE  -------------

The implementation is equivalent to the following shell command:
Linux: cat input.ogg | ffmpeg -y -f ogg -i pipe: -f wav pipe: > test.wav
Windows: type input.ogg | ffmpeg -y -f ogg -i pipe: -f wav pipe: > test.wav


The example uses ffmpeg-python module, but it's just a binding to FFmpeg sub-process (FFmpeg CLI must be installed, and must be in the execution path).


Execute FFmpeg sub-process with stdin pipe as input and stdout pipe as output:

ffmpeg_process = (
    ffmpeg
    .input('pipe:', format='ogg')
    .output('pipe:', format='wav')
    .run_async(pipe_stdin=True, pipe_stdout=True)
)

The input format is set to ogg, the output format is set to wav (use default encoding parameters).


Assuming the audio file is relatively large, we can't write the entire OGG data at once, because doing so (without "draining" stdout pipe) causes the program execution to halt.

We may have to write the OGG data (in chunks) in a separate thread, and read the encoded data in the main thread.

Here is a sample for the "writer" thread:

def writer(ffmpeg_proc, ogg_bytes_arr):
    chunk_size = 1024  # Define chunk size to 1024 bytes (the exacts size is not important).
    n_chunks = len(ogg_bytes_arr) // chunk_size  # Number of chunks (without the remainder smaller chunk at the end).
    remainder_size = len(ogg_bytes_arr) % chunk_size  # Remainder bytes (assume total size is not a multiple of chunk_size).

    for i in range(n_chunks):
        ffmpeg_proc.stdin.write(ogg_bytes_arr[i*chunk_size:(i+1)*chunk_size])  # Write chunk of data bytes to stdin pipe of FFmpeg sub-process.

    if (remainder_size > 0):
        ffmpeg_proc.stdin.write(ogg_bytes_arr[chunk_size*n_chunks:])  # Write remainder bytes of data bytes to stdin pipe of FFmpeg sub-process.

    ffmpeg_proc.stdin.close()  # Close stdin pipe - closing stdin finish encoding the data, and closes FFmpeg sub-process.

The "writer thread" writes the OGG data in small chucks.
The last chunk is smaller (assume the length is not a multiple of chuck size).

At the end, stdin pipe is closed.
Closing stdin finish encoding the data, and closes FFmpeg sub-process.


In the main thread, we are starting the thread, and read encoded "WAV" data from stdout pipe (in chunks):

thread = threading.Thread(target=writer, args=(ffmpeg_process, ogg_bytes_array))
thread.start()

while thread.is_alive():
    wav_chunk = ffmpeg_process.stdout.read(1024)  # Read chunk with arbitrary size from stdout pipe
    out_stream.write(wav_chunk)  # Write the encoded chunk to the "in-memory file".

For reading the remaining data, we may use ffmpeg_process.communicate():

# Read the last encoded chunk.
wav_chunk = ffmpeg_process.communicate()[0]
out_stream.write(wav_chunk)  # Write the encoded chunk to the "in-memory file".

Complete code sample:

import ffmpeg
import base64
from io import BytesIO
import threading

async def download_audio(update, context):
    # The method is not not used - we are reading the audio from as file instead (just for testing).
    newFile = await context.bot.get_file(update.message.voice.file_id)
    bytes_array = await newFile.download_as_bytearray()
    return bytes_array


# Equivalent Linux shell command:
# cat input.ogg | ffmpeg -y -f ogg -i pipe: -f wav pipe: > test.wav

# Equivalent Windows shell command:
# type input.ogg | ffmpeg -y -f ogg -i pipe: -f wav pipe: > test.wav

# Writer thread - write the OGG data to FFmpeg stdin pipe in small chunks of 1KBytes.
def writer(ffmpeg_proc, ogg_bytes_arr):
    chunk_size = 1024  # Define chunk size to 1024 bytes (the exacts size is not important).
    n_chunks = len(ogg_bytes_arr) // chunk_size  # Number of chunks (without the remainder smaller chunk at the end).
    remainder_size = len(ogg_bytes_arr) % chunk_size  # Remainder bytes (assume total size is not a multiple of chunk_size).

    for i in range(n_chunks):
        ffmpeg_proc.stdin.write(ogg_bytes_arr[i*chunk_size:(i+1)*chunk_size])  # Write chunk of data bytes to stdin pipe of FFmpeg sub-process.

    if (remainder_size > 0):
        ffmpeg_proc.stdin.write(ogg_bytes_arr[chunk_size*n_chunks:])  # Write remainder bytes of data bytes to stdin pipe of FFmpeg sub-process.

    ffmpeg_proc.stdin.close()  # Close stdin pipe - closing stdin finish encoding the data, and closes FFmpeg sub-process.



if False:
    # We may assume that ogg_bytes_array is the output of download_audio method
    ogg_bytes_array = download_audio(update, context)
else:
    # The example reads the decode_string from a file (for testing").
    with open('input.ogg', 'rb') as f:
        ogg_bytes_array = f.read()

    
# Execute FFmpeg sub-process with stdin pipe as input and stdout pipe as output.
ffmpeg_process = (
    ffmpeg
    .input('pipe:', format='ogg')
    .output('pipe:', format='wav')
    .run_async(pipe_stdin=True, pipe_stdout=True)
)

# Open in-memory file for storing the encoded WAV file
out_stream = BytesIO()

# Starting a thread that writes the OGG data in small chunks.
# We need the thread because writing too much data to stdin pipe at once, causes a deadlock.
thread = threading.Thread(target=writer, args=(ffmpeg_process, ogg_bytes_array))
thread.start()

# Read encoded WAV data from stdout pipe of FFmpeg, and write it to out_stream
while thread.is_alive():
    wav_chunk = ffmpeg_process.stdout.read(1024)  # Read chunk with arbitrary size from stdout pipe
    out_stream.write(wav_chunk)  # Write the encoded chunk to the "in-memory file".

# Read the last encoded chunk.
wav_chunk = ffmpeg_process.communicate()[0]
out_stream.write(wav_chunk)  # Write the encoded chunk to the "in-memory file".
out_stream.seek(0)  # Seek to the beginning of out_stream
ffmpeg_process.wait() # Wait for FFmpeg sub-process to end

# Write out_stream to file - just for testing:
with open('test.wav', "wb") as f:
    f.write(out_stream.getbuffer())
Rotem
  • 30,366
  • 4
  • 32
  • 65