4

I have an infinite number of entries being fed through a web interface. On a per-minute basis, I'd like to dump elements that were received in the last hour into a file named appropriately (datetime.now().strftime('%Y_%m_%d_%H_%M')).
Here's my design so far:

Thread-1

Keeps receiving input and adding to a data_dict of structure:
{datetime.now().strftime('%Y_%m_%d_%H_%M'): []}

Thread-2

Sleeps for a minute and writes contents of data_dict[(datetime.now() - timedelta(minutes=1)).strftime('%Y_%m_%d_%H_%M')]

Question

  1. Is using dict in this manner thread-safe?
  2. Is this a good design? :)
Lelouch Lamperouge
  • 8,171
  • 8
  • 49
  • 60

4 Answers4

2

Consider using an external store for this - redis would be my choice and since its running independent of your application, you avoid any threading issues. Plus, redis is fast for this sort of stuff.

Burhan Khalid
  • 169,990
  • 18
  • 245
  • 284
  • 1
    It'd be nice if I knew why `redis` is a better choice. I feel external stores are overkill for a simple application like this. But that's just my opinion.. – Lelouch Lamperouge May 22 '14 at 15:03
  • [a] it is _very fast_ [b] its designed for quick lookups [c] it solves your threading problem. If you need to do other lookups on the data, and you need _near real time updates_, consider [`mongodb`](http://mongodb.org). – Burhan Khalid May 22 '14 at 15:11
  • Will do, Thank you. But for the time being, I will introduce a lag as mentioned by others. – Lelouch Lamperouge May 22 '14 at 15:16
2

I don't like the coupling of the sleep(60) and the timedelta(minutes=1). I think you could get timing inaccuracies which over time could lead to skipping of datums written in the output.

I would instead take advantage of two facts:

  1. It's ok to wait until the minute is up before writing the file
  2. Time moves only forwards, so once the minute is done you know the data is effectively immutable

Bearing this in mind, you know the time after which input for that minute is complete, and you can then write the file with the last 60 minutes of data. Your second thread just sleeps until that condition is true. Then it wakes up, changes the condition to the next minute, processes the data, and goes back to waiting for the condition to trigger again. In essence, you've just written a simple queue, synchronised on minute boundaries.

ire_and_curses
  • 68,372
  • 23
  • 116
  • 141
2

1) This is (almost) thread safe. The individual operations on dict are thread safe, and your reading thread should never reads from a key which is still being written to. The exception is the following race condition which relies on the context switching occuring close to a minute boundary.

Thread 1: receives a message at 2014-05-20 13:37:59.999 then is pre-empted

Thread 2: checks the time (it is now 2014-05-20 13:38:00.000) so it reads from 2014_05_20_13_37

Thread 1: appends its message to the end of the 2014_05_20_13_37 queue

2) No, this is not good design and not just because there is an edge case in the thread safety condition. If you need to guarantee something for every minute, sleeping is a very error prone way to do this. First of all the sleep operation does not sleep for EXACTLY the amount of time given. It sleeps for at least that amount of time. Second, even if it was exact, the rest of your operation still takes some time which means there would be milliseconds of drift between your sleep calls. These two factors will likely result in you missing a minute every 6000-60000 minutes of so.

Ignoring your race condition in part 1, I would do the following:

def generate_times():
    now = datetime.datetime.now()
    next_time = datetime.datetime(now.year, now.month, now.day, now.hour, now.minute)
    while True:
        yield next_time
        next_time += datetime.timedelta(minute=1)

def past_times():
    for time in generate_times():
        while time > datetime.datetime.now() - datetime.timedelta(minute=1):
            time.sleep(1.0)
        yield time

The first function creates a generator which generates all of the on the minute times and the second function ensures that the time has already passed.

Probably the easiest way to handle the race condition from part one would be to mag thread 2 lag 2 minutes behind instead of just one (or a minute and 7 seconds or 16 minutes, whatever you want). This still isn't failproof: if you have something which stalls your kernel for a long time then these race conditions could still occur, but that is a perfect storm scenario.

If you want to be 100% correct then thread 1 needs to keep a timestamp tracking the latest time that it has written out logs at, but then you are going to want to look into non-blocking IO to make sure your last log doesn't stall if nothing is thread one is stuck waiting for something to log. I, myself, would just go for using 2 minutes inside of the past_times function instead of 1 unless this is mission critical logging for something that lives depend on.

Brendan F
  • 619
  • 3
  • 8
1

Check out this answer: Thread Safety in Python's dictionary

You can write a separate process as well. I've used uwsgi to setup an instance every thread can talk to and they post data using http calls. May not be the most efficient, but it's super easy and very safe.

Community
  • 1
  • 1