Answering my own question, I ditched librosa in favor of webrtcvad for non-speech detection since it has ha method that does exactly that. The module webrtcvad sadly has some restrictions in the kind of input that it can parse, but seems to be doing good enough for my use case.
import json
import time
import pyaudio
import webrtcvad
from queue import Queue
from threading import Thread
from vosk import Model, KaldiRecognizer
# Audio settings
FRAME_RATE = 16000
CHUNK_SIZE = 160
AUDIO_FORMAT = pyaudio.paInt16
CHANNELS = 1
SILENCE_LIMIT = 4
# Speech recognition settings
# Initialize WebRTC VAD
vad = webrtcvad.Vad()
# Aggressive VAD mode
vad.set_mode(3)
model = Model(model_name="vosk-model-small-en-us-0.22")
recognizer = KaldiRecognizer(model, FRAME_RATE)
recognizer.SetWords(True)
# Queues
messages = Queue()
recordings = Queue()
def record_microphone():
p = pyaudio.PyAudio()
stream = p.open(format=AUDIO_FORMAT,
channels=CHANNELS,
rate=FRAME_RATE,
input=True,
frames_per_buffer=CHUNK_SIZE)
while not messages.empty():
recordings.put(stream.read(CHUNK_SIZE))
stream.stop_stream()
stream.close()
p.terminate()
def speech_recognition():
buffer = b""
in_speech = False
silence_threshold = 0
while not messages.empty():
if not recordings.empty():
frames = recordings.get()
assert webrtcvad.valid_rate_and_frame_length(FRAME_RATE, CHUNK_SIZE)
is_speech = vad.is_speech(frames, sample_rate=FRAME_RATE)
if is_speech:
if not in_speech:
# if speech is detected but script not aware of speech
# make it aware
in_speech = True
# put 10ms of the audio (160 frames) in the buffer
buffer += frames
silence_threshold = 0
elif not is_speech and in_speech:
# if no speech is detected but script was expecting speech
# check if the silence_threshold is less than 4 seconds
# if it is, increase the silence_threshold of 10ms
# otherwise 4 seconds have passed and the user stopped speaking
# so we can proceed to process the buffer in waveform and reset
if silence_threshold < SILENCE_LIMIT * (FRAME_RATE / CHUNK_SIZE):
silence_threshold += 1
else:
recognizer.AcceptWaveform(buffer)
print(json.loads(recognizer.Result())["text"])
in_speech = False
silence_threshold = 0
buffer = b""
def start_recording():
messages.put(True)
print("Starting...")
record = Thread(target=record_microphone)
record.start()
transcribe = Thread(target=speech_recognition)
transcribe.start()
print("Listening.")
def stop_recording():
messages.get()
print("Stopped.")
if __name__ == "__main__":
start_recording()
time.sleep(35)
stop_recording()
Being the first time I do something of this sort, the code can (and probably will) be optimized but I'm living this here as a draft for whoever needs it in the future.