2

I'm trying to set something up where one thread is writing a list of work and another thread is reading the list and working from it. This list can be very large so to stop this list being held in memory I want to have it written in a file (or anyway of preserving memory- generators?).

I put together a little runnable example with a sleep in the writer so that the reader can catch up. I'm wondering how I can get the reader to not stop when it "overtakes" the writer. I looked at using .seek and .tell but I got weird behaviour and I'm not sure that's the right route.

As another question, is this at all a sensible idea? Maybe there's a much more elegant way I can queue up a list of strings without using loads of memory.

import threading,time

class Writer(threading.Thread):

  lock= threading.Lock()

  def __init__(self,file_path,size):
    threading.Thread.__init__(self)
    self.file_path= file_path
    self.size= size
    self.i=0

  def how_many(self):
    with self.lock:
      print "Reader starting, writer is on",self.i

  def run(self):
    f=open(self.file_path,"w")
    for i in xrange(self.size):
      with self.lock:
        self.i=i
      if i%1000==0:
        time.sleep(0.1)
      f.write("%s\n"%i)
    f.close()

class Reader(threading.Thread):

  def __init__(self,file_path):
    threading.Thread.__init__(self)
    self.file_path= file_path

  def run(self):
    f=open(self.file_path,"r")
    line=0
    for line in f:
      pass
    print "Reader got to: %s"%line.strip()


if __name__ == "__main__":
  a= Writer("testfile",2000000)
  b= Reader("testfile")
  a.start()
  time.sleep(1)
  a.how_many()
  b.start()
GP89
  • 6,600
  • 4
  • 36
  • 64
  • Have you looked into [pipes](http://stackoverflow.com/questions/1430446/create-a-temporary-fifo-named-pipe-in-python)? – Spencer Rathbun Dec 28 '11 at 14:45
  • Looks promising but this is UNIX only right? I need something that will be platform independent unfortunately – GP89 Dec 28 '11 at 15:15
  • Check [here](http://eli.thegreenplace.net/2011/12/27/python-threads-communication-and-stopping/) for an example of worker threads and queues in python. The Queue.Queue class handles creating fifo pipes cross-platform. You may need to subclass the queue to write entries to a temp file, for your memory issue. – Spencer Rathbun Dec 28 '11 at 15:59
  • How large is this very large list? Is there a good reason why you expect that the consumer/reader will be acting much more slowly than the producer/writer? – Kylotan Dec 28 '11 at 16:16
  • The list could potentially be tens of millions, and the consumer will always be a lot slower than the producer. The reason I need to queue everything is to provide some accurate progress in a gui – GP89 Dec 28 '11 at 16:32

3 Answers3

2

I did solve this, using a buffered-file-queue where the queue is spread out between memory and file. Items are put into a Queue but if the items in the queue exceed the specified Queue size, any overflow will be stored on file to preserve memory and will be get out the queue just the same

If anyone is looking to do something similar I put it on github here

GP89
  • 6,600
  • 4
  • 36
  • 64
0

For sending messages between threads the Queue class is quite handy. Import it with from Queue import Queue, construct one, and pass the queue object to each thread. It supports multiple producers and consumers, and you can put most any Python object into the queue - lists, objects, iterators, etc.

To transfer lots of data using this queue, just write one object at a time to the queue and use a generator function in the consumer that yields data out of the queue. Queues support a depth limit, in case the producer is faster than the consumer.

wberry
  • 18,519
  • 8
  • 53
  • 85
  • I use Queues in other parts of the program, underneath the Queue is just a list so anything I put in will still end up in memory right? – GP89 Dec 28 '11 at 16:03
  • If you put all the messages into the queue before reading any of them, then yes. But your producer and consumer threads can be writing and reading concurrently, and the `Queue` class handles locking and other issues. Plus you can set a maximum depth, in case the producer is faster, to limit the memory usage. – wberry Dec 28 '11 at 16:08
  • If a few messages are in memory at any given time, that's OK. Not worth using disk as a queue just to save a few objects. It's only when lots and lots of them are in memory at once that you have a real problem. – wberry Dec 28 '11 at 16:09
  • I'm currently using a list with a threadsafe lock, which essentially is the same as using a Queue (FIFO isn't needed, just a way to pass safely between threads). The problem was that my list has the potential to hold millions of items and was growing to obscene sizes (GB in memory) so I was looking for a way to prevent this. If I set a maximum depth what do I do with the Full exception on the producer? I kind of need to keep queueing up work to display accurate progress in the gui – GP89 Dec 28 '11 at 16:25
  • Well, if you consider it a requirement that the producer never wait for the consumer to catch up, then you will certainly have an exploding queue of work. No way around it. The only choice is whether to queue the messages in memory, on disk etc. – wberry Dec 28 '11 at 18:35
  • If you feel you must never block the producer, consider using a message broker. RabbitMQ is one to consider for Python clients. If you think you can live with making the producer wait, you don't have to deal with `Full` exceptions to impose a maximum depth in the `Queue` object; you can just have the producer block until the queue has an available slot. The documentation has all the details. – wberry Dec 28 '11 at 18:37
  • Either way I think your code will be more maintainable using either a `Queue` object, another construct from a library, or a broker. While it is true that `Queue` is implemented using a simple list with concurrency controls, why duplicate that work? – wberry Dec 28 '11 at 18:40
  • I decided to stream the work to a gzip file which seems to be the best solution for me. I would rather use a Queue with a small depth size but unfortunately I have to display an accurate progress, so I need to know the total amount of work. Also another annoying part is that I can't just work out the total without queueing the work initially because as time goes on the work will change, so I need to get an initial 'snapshot' where i add all the work to a queue or else the overall progress will be off if I just made an initial total. – GP89 Dec 30 '11 at 09:44
0

The multiprocessing JoinableQueue class is designed to allow limiting the backlog that can build up while waiting for child threads / processes to consume tasks. I'm going to assume you're reading work in from a file, and that the file is too large to easily hold in memory all at once.

The following is my attempt at a solution that should limit memory usage. In this example I'm processing a newline terminated series of dates, converting them into a standard format, and writing them back out to a new file.

I'm by no means an expert with the multiprocessing module, so should anyone see a mistake / better way to do it, I would like to hear it.

from multiprocessing import Process, Queue, JoinableQueue
import time

date_formats =  [
    "%Y%m",
    "%Y-%m-%d", 
    "%y-%m-%d", 
    "%y%m%d", 
    "%Y%m%d", 
    "%m/%d/%Y", 
    "%m/%d/%y", 
    "%m/%d/%Y %H:%M",
    "%m%d%y", 
    "%m%d%Y", 
    "%B, %d %Y", 
    "%B, %d %y", 
    "%d %B, %Y", 
    "%d %B, %y",
    "%B %d %Y", 
    "%B %d %y", 
    "%B %d, %Y", 
    "%B %d, %y", 
    "%B %d %Y", 
    "%B %d %y",
    "%b %d %Y", 
    "%b %d, %Y", 
    "%b %d %y", 
    "%b %d, %y", 
    "%d-%b-%y", 
    "%Y-%m-%d %H:%M:%S"
]

def convert_date(date):
    date = date.strip()
    for dateformat in date_formats:
        try:
            converted = time.strptime(date, dateformat)
            converted = time.strftime("%Y-%m-%d", converted)
            return converted
        except ValueError:
            continue

def writer(result_queue):
    f = open("iso_dates.out", "wb")
    while True:
        try:
            date = result_queue.get(timeout=1)
            f.write(date + '\n')
        except:
            break       
    f.close()

def worker(work_queue, result_queue):
    while True:
        date = work_queue.get()

        if not date:
            break

        result_queue.put(convert_date(date))
        work_queue.task_done()

dates        = open("dates.out", "rb")
work_queue   = JoinableQueue(512) #allow no more than 512 items on queue
result_queue = Queue()
writer_proc  = Process(target=writer, args=(result_queue,))
worker_procs = 2

for i in range(worker_procs):
    p = Process(target=worker, args=(work_queue, result_queue))
    p.daemon = True
    p.start()

writer_proc.start()
for date in dates:
    work_queue.put(date) #will block until tasks are consumed if maxsize is encountered

work_queue.join()
dates.close()
zchtodd
  • 1,140
  • 10
  • 24