4

Problem statement

I have a longish signal (454912 samples) and I'd like to compute an estimate of the amount of 50Hz in it. Speed is more important here than precision. The amount of 50Hz present is expected to fluctuate over time. The value needs to be representative for the entire recording, for instance the mean.

Context

The signal is recorded from an EEG electrode. When EEG electrodes have bad contact with the scalp, there will be a large amount of 50Hz powerline noise in the signal. I would like to discard all data from EEG electrodes that have excessively more 50Hz noise than other electrodes.

Solutions tried

Solving the problem is not that hard. Anything from FFT to Welch's method can be employed to estimate the power spectrum:

import numpy as np
from scipy.signal import welch

# generate an example signal
sfreq = 128.
nsamples = 454912
time = np.arange(nsamples) / sfreq
x = np.sin(2 * np.pi * 50 * time) + np.random.randn(nsamples)

# apply Welch' method to the problem
fs, ps = welch(x, sfreq)
print 'Amount of 50Hz:', ps[np.searchsorted(fs, 50)]

However, computing the power at all frequencies seems unnecessary here and I have the feeling that there is a more efficient solution. Something along the lines of computing a single DFFT step? Convolution with some sinoid wavelet?

Marijn van Vliet
  • 5,239
  • 2
  • 33
  • 45
  • seems this is a duplicate: http://stackoverflow.com/questions/13499852/scipy-fourier-transform-of-a-few-selected-frequencies – Marijn van Vliet Feb 06 '15 at 12:17
  • 1
    FWIW, if you plan to export your device to areas that use 60Hz you'll need to detect that as well. Perhaps centering your filter on 55 Hz (average of 50 and 60) with enough bandwidth will be sufficient. – Dithermaster Feb 06 '15 at 16:36

2 Answers2

6

Welch's method just computes the periodogram for multiple overlapping segments of the signal, then takes the average across segments. This effectively trades resolution for noise reduction in the frequency domain.

However, performing lots of separate FFTs for each small segment will be more expensive than computing fewer FFTs for larger segments. Depending on your needs, you may get away with using Welch's method but splitting your signal into larger segments, and/or having less overlap between them (both of which will give you less variance reduction in the PSD).

from matplotlib import pyplot as plt

# default parameters
fs1, ps1 = welch(x, sfreq, nperseg=256, noverlap=128)

# 8x the segment size, keeping the proportional overlap the same
fs2, ps2 = welch(x, sfreq, nperseg=2048, noverlap=1024)

# no overlap between the segments
fs3, ps3 = welch(x, sfreq, nperseg=2048, noverlap=0)

fig, ax1 = plt.subplots(1, 1)
ax1.hold(True)
ax1.loglog(fs1, ps1, label='Welch, defaults')
ax1.loglog(fs2, ps2, label='length=2048, overlap=1024')
ax1.loglog(fs3, ps3, label='length=2048, overlap=0')
ax1.legend(loc=2, fancybox=True)

enter image description here

Increasing the segment size and reducing the amount of overlap can substantially improve performance:

In [1]: %timeit welch(x, sfreq)
1 loops, best of 3: 262 ms per loop

In [2]: %timeit welch(x, sfreq, nperseg=2048, noverlap=1024)
10 loops, best of 3: 46.4 ms per loop

In [3]: %timeit welch(x, sfreq, nperseg=2048, noverlap=0)
10 loops, best of 3: 23.2 ms per loop

Note that it's a good idea to use a power of 2 for the window size, since it's faster to take the FFT of a signal whose length is a power of 2.


Update

Another simple thing you might consider trying is just bandpass filtering your signal with a notch filter centered on 50Hz. The envelope of the filtered signal will give you a measure of how much 50Hz power your signal contains over time.

from scipy.signal import filter_design, filtfilt

# a signal whose power at 50Hz varies over time
sfreq = 128.
nsamples = 454912
time = np.arange(nsamples) / sfreq
sinusoid = np.sin(2 * np.pi * 50 * time)
pow50hz = np.zeros(nsamples)
pow50hz[nsamples / 4: 3 * nsamples / 4] = 1
x = pow50hz * sinusoid + np.random.randn(nsamples)

# Chebyshev notch filter centered on 50Hz
nyquist = sfreq / 2.
b, a = filter_design.iirfilter(3, (49. / nyquist, 51. / nyquist), rs=10,
                               ftype='cheby2')

# filter the signal
xfilt = filtfilt(b, a, x)

fig, ax2 = plt.subplots(1, 1)
ax2.hold(True)
ax2.plot(time[::10], x[::10], label='Raw signal')
ax2.plot(time[::10], xfilt[::10], label='50Hz bandpass-filtered')
ax2.set_xlim(time[0], time[-1])
ax2.set_xlabel('Time')
ax2.legend(fancybox=True)

enter image description here


Update 2

Having seen @hotpaw2's answer I decided to try implementing the Goertzel algorithm, just for fun. Unfortunately it's a recursive algorithm (and therefore can't be vectorized over time), so I decided to write myself a Cython version:

# cython: boundscheck=False
# cython: wraparound=False
# cython: cdivision=True

from libc.math cimport cos, M_PI

cpdef double goertzel(double[:] x, double ft, double fs=1.):
    """
    The Goertzel algorithm is an efficient method for evaluating single terms
    in the Discrete Fourier Transform (DFT) of a signal. It is particularly
    useful for measuring the power of individual tones.

    Arguments
    ----------
        x   double array [nt,]; the signal to be decomposed
        ft  double scalar; the target frequency at which to evaluate the DFT
        fs  double scalar; the sample rate of x (same units as ft, default=1)

    Returns
    ----------
        p   double scalar; the DFT coefficient corresponding to ft

    See: <http://en.wikipedia.org/wiki/Goertzel_algorithm>
    """

    cdef:
        double s
        double s_prev = 0
        double s_prev2 = 0
        double coeff = 2 * cos(2 * M_PI * (ft / fs))
        Py_ssize_t N = x.shape[0]
        Py_ssize_t ii

    for ii in range(N):
        s = x[ii] + (coeff * s_prev) - s_prev2
        s_prev2 = s_prev
        s_prev = s

    return s_prev2 * s_prev2 + s_prev * s_prev - coeff * s_prev * s_prev2

Here's what it does:

freqs = np.linspace(49, 51, 1000)
pows = np.array([goertzel(x, ff, sfreq) for ff in freqs])

fig, ax = plt.subplots(1, 1)
ax.plot(freqs, pows, label='DFT coefficients')
ax.set_xlabel('Frequency (Hz)')
ax.legend(loc=1)

enter image description here

It is pretty damn fast:

In [1]: %timeit goertzel(x, 50, sfreq)
1000 loops, best of 3: 1.98 ms per loop

Obviously this approach only makes sense if you're just interested in a single frequency rather than a range of frequencies.

ali_m
  • 71,714
  • 23
  • 223
  • 298
  • I like your mention of the IIR filter too, which are too often ignored. Could you include the IIR in your `timeit`s? – tom10 Feb 03 '15 at 16:41
  • timeit for filt: 72.7 ms, timeit for welch(noverlap=0, nperseg=2048): 13.2 ms. Welch is the clear winner on my machine – Marijn van Vliet Feb 03 '15 at 16:44
  • @Rodin I'm not that surprised - they are called *Fast* Fourier Transforms for a reason! Having said that, you can definitely improve the speed of the bandpass filtering, e.g. by reducing the order of the Chebyshev filter, or by designing a finite impulse-response filter instead (e.g. [`scipy.signal.remez`](http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.signal.remez.html)). – ali_m Feb 03 '15 at 16:56
  • @ali_m: thanks heaps for the implementation of the Goertzel filter! – Marijn van Vliet Feb 13 '15 at 08:04
4

For a single sinusoidal frequency, you can use the Goertzel algorithm or a Goertzel filter, which is a computationally efficient way of computing the magnitude of a single bin of a DFT or FFT result.

You can either run this filter over the entire waveform, or use it in conjunction with Welch's method, by summing the magnitude output of a series of Goertzel filters, with a filter length chosen so that the filter bandwidth isn't too narrow (to cover possible slight frequency variations of 50 Hz versus your sample rate).

Often a Goertzel filter is used in conjunction with a power estimator to determine if the SNR for the selected frequency is valid.

hotpaw2
  • 70,107
  • 14
  • 90
  • 153