5

I'm trying to write a function that would detect all rising edges - indexes in a vector where value exceeds certain threshold. Something similar is described here: Python rising/falling edge oscilloscope-like trigger, but I want to add hysteresis, so that trigger won't fire unless the value goes below another limit.

I came up the following code:

import numpy as np

arr = np.linspace(-10, 10, 60)
sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)

above_trigger = sample_values > 0.6
below_deadband = sample_values < 0.0
combined = 1 * above_trigger - 1 * below_deadband

Now in the combined array there is 1 where the original value was above upper limit, -1 where the value was below lower limit and 0 where the value was in between:

>>> combined
array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  0,  1,  1,  1,  0,  0,
        1,  1,  1,  0, -1, -1, -1, -1, -1, -1, -1, -1, -1,  0,  1,  1,  1,
        0,  0,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
        1,  0,  0,  1,  1,  1,  0, -1, -1])

My idea was to use some clever function that would process this vector sequentially and replace all sequences of zeros with whatever non-zero value was preceding them. Then the problem would boil down to simply finding where the value changes from -1 to 1.

I though that greater operation would fulfill this purpose if used correctly: -1 encoded as True and 1 as False:

  • (True ("-1") > -1) -> True ("-1")
  • (True ("-1") > 1) -> False ("1")
  • (True ("-1") > 0) -> True ("-1")
  • (False ("1") > -1) -> True ("-1")
  • (False ("1") > 1) -> False ("1")
  • (False ("1") > 0) -> False ("1")

But the results are not what I expect:

>>> 1 - 2 * np.greater.accumulate(combined)
array([-1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,
        1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,
        1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,
        1,  1,  1,  1,  1,  1,  1,  1,  1])

It seems that the greater function doesn't correctly compare booleans with numeric values in this scenario, even though it works fine when used on scalars or pair-wise:

>>> np.greater(False, -1)
True
>>> np.greater.outer(False, combined)
array([False, False,  True,  True,  True,  True,  True,  True,  True,
        True,  True, False, False, False, False, False, False, False,
       False, False, False,  True,  True,  True,  True,  True,  True,
        True,  True,  True, False, False, False, False, False, False,
       False, False, False,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True, False, False, False, False, False,
       False, False, False, False,  True,  True])

Is this expected behavior? Am I doing something wrong here, is there any way around this?

Alternatively, maybe you could suggest another approach to this problem?

Thank you.

monkeyman79
  • 602
  • 4
  • 13

4 Answers4

2

I am not sure what the issue with np.greater.accumulate is (it does not seem to behave as advertised indeed), but the following should work:

import numpy as np
import numpy as np

arr = np.linspace(-10, 10, 60)
sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)

above_trigger = sample_values > 0.6
below_deadband = sample_values < 0.0
combined = 1 * above_trigger - 1 * below_deadband

mask = combined != 0
idx = np.where(mask,np.arange(len(mask)),0)
idx = np.maximum.accumulate(idx)
result = combined[idx]

print(f"combined:\n {combined}\n")
print(f"result:\n {result}")

It gives:

combined:
 [ 1  1 -1 -1 -1 -1 -1 -1 -1 -1 -1  0  1  1  1  0  0  1  1  1  0 -1 -1 -1
 -1 -1 -1 -1 -1 -1  0  1  1  1  0  0  1  1  1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1  1  1  1  0  0  1  1  1  0 -1 -1]

result:
 [ 1  1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1  1  1  1  1  1  1  1  1  1 -1 -1 -1
 -1 -1 -1 -1 -1 -1 -1  1  1  1  1  1  1  1  1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1  1  1  1  1  1  1  1  1  1 -1 -1]

Then the indices where values jumps from -1 to 1 can be obtained as follows:

np.nonzero(result[1:] > result[:-1])[0] + 1

It gives:

array([12, 31, 49])
bb1
  • 7,174
  • 2
  • 8
  • 23
2

I've been developing a package called ufunclab that includes the function hysteresis_relay that does what you want. I haven't put it on PyPI, so you would have to grab the source and build it yourself to use it.

In [122]: import numpy as np

In [123]: from ufunclab import hysteresis_relay

In [124]: arr = np.linspace(-10, 10, 60)

In [125]: sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)

In [126]: hysteresis_relay(sample_values, 0.0, 0.6, -1, 1, 1).astype(int)
Out[126]: 
array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,  1,  1,
        1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,
        1,  1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
        1,  1,  1,  1,  1,  1,  1, -1, -1])

Another option is to use Pandas (but I suspect @bb1's answer will be more efficient than this, and @bb1's answer avoids depending on another library).

  1. Convert combined to a Pandas Series.
  2. Replace 0 with pd.NA in the Series.
  3. Use the method fillna() with method='ffill' to "forward fill" the NA values.
  4. Convert the Series back to a NumPy array with the to_numpy() method.
In [107]: combined
Out[107]: 
array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  0,  1,  1,  1,  0,  0,
        1,  1,  1,  0, -1, -1, -1, -1, -1, -1, -1, -1, -1,  0,  1,  1,  1,
        0,  0,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
        1,  0,  0,  1,  1,  1,  0, -1, -1])

In [108]: import pandas as pd

In [109]: pd.Series(combined).replace(0, pd.NA).fillna(method='ffill').to_numpy()
Out[109]: 
array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,  1,  1,
        1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,
        1,  1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
        1,  1,  1,  1,  1,  1,  1, -1, -1])
Warren Weckesser
  • 110,654
  • 19
  • 194
  • 214
  • This is very useful module indeed, though I had problems compiling it on VS2019. It chokes on complex data type in the vnorm function. Compiles fine if I checkout version before complex in vnorm. – monkeyman79 Aug 21 '21 at 09:38
  • Microsoft's Visual Studio is primarily a C++ compiler. Their support of the C standards (C99, in this case) is not complete. – Warren Weckesser Aug 21 '21 at 12:46
  • OK, so is there a recommended way to build it on Windows? I just executed `pip install .` and apparently it detected my VS2019 installation and used that. – monkeyman79 Aug 21 '21 at 19:11
0

Here is another simple solution:

def gen(arr, start=0):
    y = start
    for x in arr:
        if x != 0:
            y = x
        yield y

g = gen(combined)
# set count for performance
np.fromiter(g, dtype=int, count=combined.size)

>>> array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,  1,  1,
    1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,
    1,  1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
    1,  1,  1,  1,  1,  1,  1, -1, -1])

you can write a similar generator or loop to detect the jump directly:

p = 0
for i, x in enumerate(combined):
    if x - p == 2:
        print(i)
        break
    if x != 0:
        p = x

combined[i-5:i+1]
>>> 12
>>> array([-1, -1, -1, -1,  0,  1])
examiner
  • 156
  • 6
0

Thank you all for your answers.

For the record, below are timing results for the proposed solutions:

import numpy as np
import pandas as pd
import ufunclab

arr = np.linspace(-10, 10, 600)
values = np.sin(arr)+0.6*np.sin(arr*3)


def trigger_using_greater(values):
    # This doesn't give correct results
    combined = 1 * (values > 0.6) - 1 * (values < 0)
    return 1 - 2 * np.greater.accumulate(combined)


def trigger_using_masked_indexes(values):
    combined = 1 * (values > 0.6) - 1 * (values < 0)
    mask = combined != 0
    idx = np.where(mask, np.arange(len(mask)), 0)
    idx = np.maximum.accumulate(idx)
    return combined[idx]


def trigger_using_hysteresis_relay(values):
    result = ufunclab.hysteresis_relay(values, 0.0, 0.6, -1, 1, 1).astype(int)
    return result


def trigger_using_pandas(values):
    combined = 1 * (values > 0.6) - 1 * (values < 0)
    result = pd.Series(combined).replace(0, pd.NA).fillna(method='ffill').to_numpy()
    return result

def gen(arr, start=0):
    y = start
    for x in arr:
        if x != 0:
            y = x
        yield y

def trigger_using_generator(values):
    combined = 1 * (values > 0.6) - 1 * (values < 0)
    g = gen(combined)
    return np.fromiter(g, dtype=int, count=combined.size)
In [8]: %timeit trigger_using_greater(values)
21.9 µs ± 1.3 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [9]: %timeit trigger_using_masked_indexes(values)
26.8 µs ± 563 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [10]: %timeit trigger_using_hysteresis_relay(values)
7.31 µs ± 759 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [11]: %timeit trigger_using_pandas(values)
755 µs ± 63.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [12]: %timeit trigger_using_generator(values)
165 µs ± 3.37 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

The hysteresis_relay is the clear winner here, but at the cost of compiling ufunclab package. Very useful package by the way. Warren, consider publishing it to PyPI. Ideally I'd love to see at least some of those functions integrated into SciPy.

The masked indexes solution is almost as fast as my original (not working) solution and doesn't require and external library.

The Pandas solution is astonishingly slow, even slower that standard Python generator.

monkeyman79
  • 602
  • 4
  • 13