5

I'm currently working on a radio astronomy project where I need to monitor the amplitude of an audio signal over time.

I've used the simplified Python code suggested by user1405612 here Detect tap with pyaudio from live mic which takes the mic input and works out the RMS amplitude and I've added a part to simply log the value to a CSV file. This is working very well, and thanks must got ouser1405612 for it!

However is there a way I can implement a simple frequency filter to this code. For example I am interested in the RMS amplitude of frequency 19.580khz (in reality I would want to look at the range of say 19.4hkz to 19.6hkz)?

Is there a way to do this with PyAudio using the code in the link above by looking at the raw stream data for example, or any other way? I don't want anything complex like graphs, spectrum analysis etc, just a simple frequency filter. Unfortunately a band pass filter before the mic input is not possible, so it needs to be done on the computer.

Thanks in advance!

Update - 31/12/14 - her is my current code:

# source https://stackoverflow.com/questions/4160175/detect-tap-with-pyaudio-from-live-mic

import pyaudio
import struct
import math
import datetime


FORMAT = pyaudio.paInt16 
SHORT_NORMALIZE = (1.0/32768.0)
CHANNELS = 1
#RATE = 44100 
RATE = 48000 
INPUT_BLOCK_TIME = 1
INPUT_FRAMES_PER_BLOCK = int(RATE*INPUT_BLOCK_TIME)
filename = 'Data.CSV'

def get_rms(block):

    count = len(block)/2
    format = "%dh"%(count)
    shorts = struct.unpack( format, block )

    # iterate over the block.
    sum_squares = 0.0
    for sample in shorts:
    # sample is a signed short in +/- 32768. 
    # normalize it to 1.0
        n = sample * SHORT_NORMALIZE
        sum_squares += n*n

    return math.sqrt( sum_squares / count )

pa = pyaudio.PyAudio()                                 

stream = pa.open(format = FORMAT,                      
         channels = CHANNELS,                          
         rate = RATE,                                  
         input = True,                                 
         frames_per_buffer = INPUT_FRAMES_PER_BLOCK)   

errorcount = 0                                                  

for i in range(1000):
    try:                                                    
        block = stream.read(INPUT_FRAMES_PER_BLOCK)         
    except IOError, e:                                      
        errorcount += 1                                     
        print( "(%d) Error recording: %s"%(errorcount,e) )  
        noisycount = 1                                      

    amplitude = get_rms(block)
    print amplitude

    #writeCSV
    i = datetime.datetime.now()
    f = open(filename,"a")
    f.write("{},{}\n".format(i,amplitude))
    f.close()
Community
  • 1
  • 1
JamesH
  • 85
  • 2
  • 8

1 Answers1

4

SciPy has all the capabilities you would need to digitally bandpass the signal.

Design a bandpass filter

For this example I'm going to design a 3rd-order butterworth bandpass filter using scipy.signal.butter:

def design_filter(lowcut, highcut, fs, order=3):
    nyq = 0.5*fs
    low = lowcut/nyq
    high = highcut/nyq
    b,a = butter(order, [low,high], btype='band')
    return b,a

Run the filter

The return of the function is a set of filter coefficients that can be used by the spicy.signal.lfilter function. Most of the examples you'll find are operating on the data in batch so they only invoke the function once. Since you're dealing with a realtime stream, yours is going to be a little trickier. The function takes the previous filter states as a parameter and returns the new state. So you'll need to store the returned states so you can pass it the next time around. This is roughly how it would work into your existing code. You'll need to refactor the data normalization out of the get_rms function which isn't a bad idea anyway:

def normalize(block):
    count = len(block)/2
    format = "%dh"%(count)
    shorts = struct.unpack( format, block )
    doubles = [x * SHORT_NORMALIZE for x in shorts]
    return doubles


def get_rms(samples):
    sum_squares = 0.0
    for sample in doubles:
        sum_squares += n*n
    return math.sqrt( sum_squares / count )


pa = pyaudio.PyAudio()                                 
stream = pa.open(format = FORMAT,                      
         channels = CHANNELS,                          
         rate = RATE,                                  
         input = True,                                 
         frames_per_buffer = INPUT_FRAMES_PER_BLOCK)   

errorcount = 0                                                  

# design the filter
b,a = design_filter(19400, 19600, 48000, 3)
# compute the initial conditions.
zi = lfilter_zi(b, a)

for i in range(1000):
    try:                                                    
        block = stream.read(INPUT_FRAMES_PER_BLOCK)         
    except IOError as e:                                      
        errorcount += 1                                     
        print( "(%d) Error recording: %s"%(errorcount,e) )  
        noisycount = 1          

    samples = normalize(block)                            

    bandpass_samples,zi = lfilter(b, a, samples, zi)

    amplitude = get_rms(samples)
    bandpass_ampl = get_rms(bandpass_samples)
    print(amplitude)
    print(bandpass_ampl)

Sorry, I am unable to run this code to test. There is a possibility though that samples will need to be converted to a np.array.

jaket
  • 9,140
  • 2
  • 25
  • 44
  • Many thanks &jacket. I have taken a look at scipy and think it will do the job. However i'm having trouble implementing it into my code. – JamesH Dec 31 '14 at 15:12
  • I have now included my code in my original question. How would I go about it in my case as most of the other examples I can find on the internet are not using it in conjunction with pyaudio? I have tried using your guide but alas I've not had any luck. Also I presume scypi in scypi.signal.butter is a type and should be scipy? Many Thanks! – JamesH Dec 31 '14 at 15:25
  • In this code, I get a syntax error at `len(block)/2a` . Should it be 2 instead of 2a, or should it be `2*a` (`a` added to the argument list)? – Daniel Marschall Jun 09 '19 at 20:17
  • @DanielMarschall. Edited. I meant 2. In c++ it would be sizeof(short) but I'm not sure how to do the same in python. – jaket Jun 10 '19 at 06:31