18

I have a thread which is updating a list called l. Am I right in saying that it is thread-safe to do the following from another thread?

filter(lambda x: x[0] == "in", l)

If its not thread safe, is this then the correct approach:

import threading
import time
import Queue

class Logger(threading.Thread):
    def __init__(self, log):
        super(Logger, self).__init__()
        self.log = log
        self.data = []
        self.finished = False
        self.data_lock = threading.Lock()

    def run(self):
        while not self.finished:
            try:
                with self.data_lock: 
                    self.data.append(self.log.get(block=True, timeout=0.1))
            except Queue.Empty:
                pass

    def get_data(self, cond):
        with self.data_lock: 
            d = filter(cond, self.data)      
        return d 

    def stop(self):
        self.finished = True
        self.join()  
        print("Logger stopped")

where the get_data(self, cond) method is used to retrieve a small subset of the data in the self.data in a thread safe manner.

Baz
  • 12,713
  • 38
  • 145
  • 268

2 Answers2

7

First, to answer your question in the title: filter is just a function. Hence, its thread-safety will rely on the data-structure you use it with.

As pointed out in the comments already, list operations themselves are thread-safe in CPython and protected by the GIL, but that is arguably only an implementation detail of CPython that you shouldn't really rely on. Even if you could rely on it, thread safety of some of their operations probably does not mean the kind of thread safety you mean:

The problem is that iterating over a sequence with filter is in general not an atomic operation. The sequence could be changed during iteration. Depending on the data-structure underlying your iterator this might cause more or less weird effects. One way to overcome this problem is by iterating over a copy of the sequence that is created with one atomic action. Easiest way to do this for standard sequences like tuple, list, string is with the slice operator like this:

filter(lambda x: x[0] == "in", l[:])

Apart from this not necessarily being thread-safe for other data-types, there's one problem with this though: it's only a shallow copy. As your list's elements seem to be list-like as well, another thread could in parallel do del l[1000][:] to empty one of the inner lists (which are pointed to in your shallow copy as well). This would make your filter expression fail with an IndexError.

All that said, it's not a shame to use a lock to protect access to your list and I'd definitely recommend it. Depending on how your data changes and how you work with the returned data, it might even be wise to deep-copy the elements while holding the lock and to return those copies. That way you can guarantee that once returned the filter condition won't suddenly change for the returned elements.

Wrt. your Logger code: I'm not 100 % sure how you plan to use this and if it's critical for you to run several threads on one queue and join them. What looks weird to me is that you never use Queue.task_done() (assuming that its self.log is a Queue). Also your polling of the queue is potentially wasteful. If you don't need the join of the thread, I'd suggest to at least turn the lock acquisition around:

class Logger(threading.Thread):
    def __init__(self, log):
        super(Logger, self).__init__()
        self.daemon = True
        self.log = log
        self.data = []
        self.data_lock = threading.Lock()

    def run(self):
        while True:
            l = self.log.get()  # thread will sleep here indefinitely
            with self.data_lock: 
                self.data.append(l)
            self.log.task_done()

    def get_data(self, cond):
        with self.data_lock: 
            d = filter(cond, self.data)
            # maybe deepcopy d here
        return d

Externally you could still do log.join() to make sure that all of the elements of the log queue are processed.

Jörn Hees
  • 3,338
  • 22
  • 44
  • 1
    GIL protection is not something can be replied on, and hopefully this thing should be eliminated in the future. probably is better to reply on spec? – Jason Hu Jun 08 '15 at 13:31
  • i'm not entirely sure what you mean with `reply on spec`, but i edited the answer to recommend not relying on the GIL – Jörn Hees Jun 08 '15 at 14:25
  • 1
    @JörnHees The existance of the GIL is a CPython implementation detail, so you shouldn’t rely on it (because you shouldn’t rely on implementation details). What HuStmpHrrr means is that you should instead only rely on features that are given by the specification (the language spec or the language library). – poke Jun 08 '15 at 14:28
  • @JörnHees oh... forgive my spelling, i meant `rely on`. sorry about that. – Jason Hu Jun 08 '15 at 14:29
  • agreed, i'll point that out more explicitly in the answer, thanks for the comments – Jörn Hees Jun 08 '15 at 14:30
4

If one thread writes to a list and another thread reads that list, the two must be synchronized. It doesn't matter for that aspect whether the reader uses filter(), an index or iteration or whether the writer uses append() or any other method.

In your code, you achieve the necessary synchronization using a threading.Lock. Since you only access the list within the context of with self.data_lock, the accesses are mutually exclusive.

In summary, your code is formally correct concerning the list handling between threads. But:

  • You do access self.finished without the lock, which is problematic. Assigning to that member will change self, i.e. the mapping of the object to the according members, so this should be synced. Effectively, this won't hurt, because True and False are global constants, at the worst you will have a short delay between setting the state in one thread and seeing the state in the other. It remains bad, because it is habit-forming.
  • As a rule, when you use a lock, always document which objects this lock protects. Also, document which object is accessed by which thread. The fact that self.finished is shared and requires synchronization would have been obvious. Also, making a visual distinction between public functions and data and private ones (beginning with an _underscore, see PEP 8) helps keeping track of this. It also helps other readers.
  • A similar issue is your baseclass. In general, inheriting from threading.Thread is a bad idea. Rather, include an instance of the thread class and give it a function like self._main_loop to run on. The reason is that you say that your Logger is a Thread and that all of its baseclass' public members are also public members of your class, which is probably a much wider interface than what you intended.
  • You should never block with a lock held. In your code, you block in self.log.get(block=True, timeout=0.1) with the lock on the mutex. In that time, even if nothing actually happens, no other thread will be able to call and complete a call to get_data(). There is actually just a tiny window between unlocking the mutex and locking it again where a caller of get_data() does not have to wait, which is very bad for performance. I could even imagine that your question is motivated by the really bad performance this causes. Instead, call log.get(..) without lock, it shouldn't need one. Then, with the lock held, append data to self.data and check self.finished.
Ulrich Eckhardt
  • 16,572
  • 3
  • 28
  • 55
  • Thanks for your reply. Would you mind elaborating on your third point above as I don't understand it. Thanks again! – Baz Jun 09 '15 at 06:17
  • 1
    You can just make a thread run some function with `threading.Thread(target=func_name)` like in the [Queue example](https://docs.python.org/2/library/queue.html#Queue.Queue.join). So you could make `Logger` a sub-class of `object` and then in its `__init__` create a `self.thread = Thread(target=self._main_loop)`. You then define a method `_main_loop` inside your `Logger` class. That way you won't have a Logger with all the public methods and properties of a `Thread` that you don't use. It's a bit cleaner, but a matter of choice i'd say. – Jörn Hees Jun 09 '15 at 15:23
  • When you think of a `Logger`, do you think of it as a `Thread` or is its main point the `get_data()` function? I'd rather say that it's the latter, and the fact that it uses a thread to poll a queue is irrelevant. If it had data pushed into it, it wouldn't use a thread (at least not directly) but the `get_data()` interface would remain. In that light, the thread is just an implementation detail that shouldn't be visible in the public interface. – Ulrich Eckhardt Jun 09 '15 at 19:09