I fixed the code above by Mike Schultz. I also tried to make the value of rms threshold set up automatically depending on the microphone noise but failed miserably. So, you have to manually set the threshold to your microphone's noise level.
import pyaudio
import math
import struct
import wave
import time
import datetime
import os
TRIGGER_RMS = 10 # start recording above 10
RATE = 16000 # sample rate
TIMEOUT_SECS = 1 # silence time after which recording stops
FRAME_SECS = 0.25 # length of frame(chunks) to be processed at once in secs
CUSHION_SECS = 1 # amount of recording before and after sound
SHORT_NORMALIZE = (1.0/32768.0)
FORMAT = pyaudio.paInt16
CHANNELS = 1
SHORT_WIDTH = 2
CHUNK = int(RATE * FRAME_SECS)
CUSHION_FRAMES = int(CUSHION_SECS / FRAME_SECS)
TIMEOUT_FRAMES = int(TIMEOUT_SECS / FRAME_SECS)
f_name_directory = './'
class Recorder:
@staticmethod
def rms(frame):
count = len(frame) / SHORT_WIDTH
format = "%dh" % (count)
shorts = struct.unpack(format, frame)
sum_squares = 0.0
for sample in shorts:
n = sample * SHORT_NORMALIZE
sum_squares += n * n
rms = math.pow(sum_squares / count, 0.5)
return rms * 1000
def __init__(self):
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
output=True,
frames_per_buffer=CHUNK)
self.time = time.time()
self.quiet = []
self.quiet_idx = -1
self.timeout = 0
def record(self):
print('')
sound = []
start = time.time()
begin_time = None
while True:
data = self.stream.read(CHUNK)
rms_val = self.rms(data)
if self.inSound(data):
sound.append(data)
if begin_time == None:
begin_time = datetime.datetime.now()
else:
if len(sound) > 0:
self.write(sound, begin_time)
sound.clear()
begin_time = None
else:
self.queueQuiet(data)
curr = time.time()
secs = int(curr - start)
tout = 0 if self.timeout == 0 else int(self.timeout - curr)
label = 'Listening' if self.timeout == 0 else 'Recording'
print('[+] %s: Level=[%4.2f] Secs=[%d] Timeout=[%d]' % (label, rms_val, secs, tout), end='\r')
# quiet is a circular buffer of size cushion
def queueQuiet(self, data):
self.quiet_idx += 1
# start over again on overflow
if self.quiet_idx == CUSHION_FRAMES:
self.quiet_idx = 0
# fill up the queue
if len(self.quiet) < CUSHION_FRAMES:
self.quiet.append(data)
# replace the element on the index in a cicular loop like this 0 -> 1 -> 2 -> 3 -> 0 and so on...
else:
self.quiet[self.quiet_idx] = data
def dequeueQuiet(self, sound):
if len(self.quiet) == 0:
return sound
ret = []
if len(self.quiet) < CUSHION_FRAMES:
ret.append(self.quiet)
ret.extend(sound)
else:
ret.extend(self.quiet[self.quiet_idx + 1:])
ret.extend(self.quiet[:self.quiet_idx + 1])
ret.extend(sound)
return ret
def inSound(self, data):
rms = self.rms(data)
curr = time.time()
if rms > TRIGGER_RMS:
self.timeout = curr + TIMEOUT_SECS
return True
if curr < self.timeout:
return True
self.timeout = 0
return False
def write(self, sound, begin_time):
# insert the pre-sound quiet frames into sound
sound = self.dequeueQuiet(sound)
# sound ends with TIMEOUT_FRAMES of quiet
# remove all but CUSHION_FRAMES
keep_frames = len(sound) - TIMEOUT_FRAMES + CUSHION_FRAMES
recording = b''.join(sound[0:keep_frames])
filename = begin_time.strftime('%Y-%m-%d_%H.%M.%S')
pathname = os.path.join(f_name_directory, '{}.wav'.format(filename))
wf = wave.open(pathname, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(self.p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(recording)
wf.close()
print('[+] Saved: {}'.format(pathname))
a = Recorder()
a.record()
Other than that if anyone is trying to detect human speech not all sounds in general, you should look up for something called voice activity detector (VAD) like this one, they provide SDK for multiple platforms good for application development. There also exists webrtc, but it is comparatively slower and less accurate.
Lastly you can train you own neural network model to detect speech, noise, exact words and whatever you want to even though it will take significantly more time and effort to do it.