16

I'm new to multi-threading in Python and am currently writing a script that appends to a csv file. If I was to have multiple threads submitted to an concurrent.futures.ThreadPoolExecutor that appends lines to a csv file. What could I do to guarantee thread safety if appending was the only file-related operation being done by these threads?

Simplified version of my code:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for count,ad_id in enumerate(advertisers):

        downloadFutures.append(executor.submit(downloadThread, arguments.....))
        time.sleep(random.randint(1,3)) 

And my thread class being:

def downloadThread(arguments......):

                #Some code.....

                writer.writerow(re.split(',', line.decode()))

Should I set up a seperate single-threaded executor to handle writing or is it woth worrying about if I am just appending?

EDIT: I should elaborate that when the write operations occur can vary greatly with minutes between when the file is next appended to, I am just concerned that this scenario has not occurred when testing my script and I would prefer to be covered for that.

GreenGodot
  • 6,030
  • 10
  • 37
  • 66
  • You might be able to make a threadsafe `csvwriter` using one of the techniques mentioned in [this answer](http://stackoverflow.com/a/13618333/355230) to a related question. – martineau Oct 13 '15 at 16:00

3 Answers3

18

I am not sure if csvwriter is thread-safe. The documentation doesn't specify, so to be safe, if multiple threads use the same object, you should protect the usage with a threading.Lock:

# create the lock
import threading
csv_writer_lock = threading.Lock()

def downloadThread(arguments......):
    # pass csv_writer_lock somehow
    # Note: use csv_writer_lock on *any* access
    # Some code.....
    with csv_writer_lock:
        writer.writerow(re.split(',', line.decode()))

That being said, it may indeed be more elegant for the downloadThread to submit write tasks to an executor, instead of explicitly using locks like this.

Claudiu
  • 224,032
  • 165
  • 485
  • 680
18

Way-late-to-the-party note: You could handle this a different way with no locking by having a single writer consuming from a shared Queue, with rows being pushed to the Queue by the threads doing the processing.

from threading import Thread
from queue import Queue
from concurrent.futures import ThreadPoolExecutor


# CSV writer setup goes here

queue = Queue()


def consume():
    while True:
        if not queue.empty():
            i = queue.get()
            
            # Row comes out of queue; CSV writing goes here
            
            print(i)
            if i == 4999:
                return


consumer = Thread(target=consume)
consumer.setDaemon(True)
consumer.start()


def produce(i):
    # Data processing goes here; row goes into queue
    queue.put(i)


with ThreadPoolExecutor(max_workers=10) as executor:
    for i in range(5000):
        executor.submit(produce, i)

consumer.join()
kungphu
  • 4,592
  • 3
  • 28
  • 37
  • I think this is a more elegant solution than the others; it decouples the processes in a better way. Maybe the reply could be a bit more elaborated, in particular wrt to the condition for the consumer to terminate. Somehow, the pool must signal to the consumer that it is done with the tasks. The hard-coded check `if i == 4999` may fail e.g. in the case of an exception. – Martin Hepp Mar 29 '21 at 22:38
  • @MartinHepp Yes, that 4999 check was intended only to show that you need a condition in which the consumer terminates. You're certainly right that it needs exception handling and robust termination checks, but this example is intended solely to illustrate allowing multiple threads/processes to use a single resource without explicit lock handling, so I kept it as lean as possible. – kungphu Mar 30 '21 at 23:35
  • 1
    It is actually very simple and working solution for some monitored quick and dirty task , I put some print statement in the consumer when there is no data , so when consumer keeps printing "waiting for data" in else block of if not queue.empty(): , I knew to kill the program and just to make sure I do not loose data in csv I always seek(0) on file. Thanks man. – PankajKushwaha Jun 14 '22 at 08:32
  • Be careful with this, if you're putting each row of the CSV file to the queue, and queue is not populated for an extended time (i.e., you're crunching some data), it will be extremely slow since each `empty()` check needs to acquire the mutex. So instead of checking emptiness each iteration, you might want to add an appropriate `time.sleep` call. – suayip uzulmez Jun 13 '23 at 13:31
6

here is some code, it also handles the headache-causing unicode issue:

def ensure_bytes(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s

class ThreadSafeWriter(object):
'''
>>> from StringIO import StringIO
>>> f = StringIO()
>>> wtr = ThreadSafeWriter(f)
>>> wtr.writerow(['a', 'b'])
>>> f.getvalue() == "a,b\\r\\n"
True
'''

    def __init__(self, *args, **kwargs):
        self._writer = csv.writer(*args, **kwargs)
        self._lock = threading.Lock()

    def _encode(self, row):
        return [ensure_bytes(cell) for cell in row]

    def writerow(self, row):
        row = self._encode(row)
        with self._lock:
            return self._writer.writerow(row)

    def writerows(self, rows):
        rows = (self._encode(row) for row in rows)
        with self._lock:
            return self._writer.writerows(rows)

# example:
with open('some.csv', 'w') as f:
    writer = ThreadSafeWriter(f)
    writer.write([u'中文', 'bar'])

a more detailed solution is here

Rufat
  • 536
  • 1
  • 8
  • 25
ospider
  • 9,334
  • 3
  • 46
  • 46