0

What is an efficient way that I can trigger some function once the length of a list changes by a certain amount?

I have a nested list to which I add data 100 times per second, and I want to trigger a function once the length of the list increased by some value. I tried doing this with an if statement inside a while loop (see my_loop() below). This works, but this seemingly simple operation takes up 100% of one of my CPU cores. It seems to me that constantly querying the size of the list is the limiting factor of the script (adding data to the list in the while loop is not resource-intensive).

Here is what I have tried so far:

from threading import Event, Thread
import time

def add_indefinitely(list_, kill_signal):
    """
    list_ : list
        List to which data is added.
    kill_signal : threading.Event
    """
    while not kill_signal.is_set():
        list_.append([1] * 32)
        time.sleep(0.01)  # Equivalent to 100 Hz.


def my_loop(buffer_len, kill_signal):
    """
    buffer_len: int, float
        Size of the data buffer in seconds. Gets converted to n_samples
        by multiplying by the sampling frequency (i.e., 100).
    kill_signal : threading.Event
    """
    buffer_len *= 100
    b0 = len(list_)
    while not kill_signal.is_set():
        if len(list_) - b0 > buffer_len:
            b0 = len(list_)
            print("Len of list_ is {}".format(b0))


list_ = []
kill_signal = Event()
buffer_len = 2  # Print something every 2 seconds.


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal))
data_thread.start()

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal))
loop_thread.start()


def stop_all():
    """Stop appending to and querying the list.
    SO users, call this function to clean up!
    """
    kill_signal.set()
    data_thread.join()
    loop_thread.join()

Example output:

Len of list_ is 202
Len of list_ is 403
Len of list_ is 604
Len of list_ is 805
Len of list_ is 1006
jkr
  • 17,119
  • 2
  • 42
  • 68
  • 1
    Python multithreading doesn't switch active threads until I/O takes place or `time.sleep()` is called the one that's currently running. This means your code spends most of its time executing one thread or the other. – martineau Dec 02 '16 at 22:51
  • Thank you. This worked even when I used a sleep duration of 1 ms (`time.sleep(0.001)`). – jkr Dec 02 '16 at 23:23

2 Answers2

1

It's not very safe to access a list from two threads, so I'll suggest a safer way to communicate between threads. In CPython, your code won't corrupt the contents of the list, but you might not get exactly 200 items each time you process a batch. If you started removing items from the list in my_loop(), you could run into trouble. If you use other versions of Python without the GIL, you could have more trouble.

Before that, though, here's the smallest change I can think of to solve the problem you asked about: CPU usage. I just added a sleep to my_loop() and cleaned up the buffer calculations so it now reports a mostly steady 201, 401, 601. Occasionally, I see a 1002.

from threading import Event, Thread
import time

def add_indefinitely(list_, kill_signal):
    """
    list_ : list
        List to which data is added.
    kill_signal : threading.Event
    """
    while not kill_signal.is_set():
        list_.append([1] * 32)
        time.sleep(0.01)  # Equivalent to 100 Hz.


def my_loop(buffer_len, kill_signal):
    """
    buffer_len: int, float
        Size of the data buffer in seconds. Gets converted to n_samples
        by multiplying by the sampling frequency (i.e., 100).
    kill_signal : threading.Event
    """
    buffer_len *= 100
    b0 = len(list_)
    while not kill_signal.is_set():
        time.sleep(0.01)
        if len(list_) - b0 >= buffer_len:
            b0 += buffer_len
            print("Len of list_ is {}".format(len(list_)))


list_ = []
kill_signal = Event()
buffer_len = 2  # Print something every 2 seconds.


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal))
data_thread.start()

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal))
loop_thread.start()


def stop_all():
    """Stop appending to and querying the list.
    SO users, call this function to clean up!
    """
    kill_signal.set()
    data_thread.join()
    loop_thread.join()

time.sleep(30)
stop_all()

Now, to do this safely, I suggest you use a queue. That will allow many threads to read or write to the queue, and it will handle the communication. If a thread tries to read from an empty queue, it just blocks until some other thread adds an item to the queue.

I wasn't sure exactly what you wanted to do with the items, so I just put them in a list and left them there. However, now that the list is only being accessed by one thread, it's safe to clear it out after each batch of 100 items are processed.

Because the my_loop() is now blocking, it won't necessarily notice when you set the kill signal. Instead, I used a sentry value of None in the request queue to tell it to shut down. If that doesn't work for you, you can use a timeout when getting an item from the queue, check the kill signal, and then try getting an item again.

from threading import Event, Thread
from queue import Queue
import time

def add_indefinitely(request_queue, kill_signal):
    """
    list_ : list
        List to which data is added.
    kill_signal : threading.Event
    """
    while not kill_signal.is_set():
        request_queue.put([1] * 32)
        time.sleep(0.01)  # Equivalent to 100 Hz.
    request_queue.put(None)  # Signal to shut down


def my_loop(buffer_len, kill_signal):
    """
    buffer_len: int, float
        Size of the data buffer in seconds. Gets converted to n_samples
        by multiplying by the sampling frequency (i.e., 100).
    kill_signal : threading.Event
    """
    received_items = []  # replaces list_
    buffer_len *= 100
    while True:
        item = request_queue.get()
        if item is None:
            break
        received_items.append(item)
        if len(received_items) % buffer_len == 0:
            print("Len of received_items is {}".format(len(received_items)))


request_queue = Queue()
kill_signal = Event()
buffer_len = 2  # Print something every 2 seconds.


data_thread = Thread(target=add_indefinitely, args=(request_queue, kill_signal))
data_thread.start()

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal))
loop_thread.start()


def stop_all():
    """Stop appending to and querying the list.
    SO users, call this function to clean up!
    """
    kill_signal.set()
    data_thread.join()
    loop_thread.join()

time.sleep(30)
stop_all()
Community
  • 1
  • 1
Don Kirkby
  • 53,582
  • 27
  • 205
  • 286
1

It would be a lot more efficient to not have a separate thread looping and checking the length of the list, but instead just have one that blocked and waited for a threading.Event to happen. Here's an example illustrating how to do something like that with an extra threading.Lock object added to control concurrent access to global variables or resources (like the printing to stdout):

from threading import Event, Lock, Thread
import time

THRESHOLD = 100  # minimum number of items in list_ before event triggered
list_ = []
list_lock = Lock()  # to control access to global list_
length_signal = Event()
print_lock = Lock()  # to control concurrent printing

def add_indefinitely():
    while True:
        with list_lock:
            list_.append([1] * 32)
            if len(list_) >= THRESHOLD:
                with print_lock:
                    print('setting length_signal')
                length_signal.set()
        time.sleep(.01) # give other threads a change to run

def length_reached():
    """
    Waits until list_ has reached a certain length, and then print message
    """
    with print_lock:
        print('waiting for list_ to reach threshold length')
    length_signal.wait()  # blocks until event is set
    with print_lock:
        with list_lock:
            print('list_ now contains {} items'.format(len(list_)))

# first start thread that will wait for the length to be reached
length_reached_thread = Thread(target=length_reached)
length_reached_thread.start()

data_thread = Thread(target=add_indefinitely)
data_thread.daemon = True
data_thread.start()

length_reached_thread.join()
with print_lock:
    print('finished')
martineau
  • 119,623
  • 25
  • 170
  • 301