2

I am using the Python Queue cclass to manage a list of tasks that are shared between multiple worker threads. The actual code is humungous and I'm still in the process of making it entirely bug free. Time to time, the worker threads will crash and I have to restart the whole routine. In the process I lose all the tasks that have been queued. Is there a way to save the queue to a file so that whenever I restart the process the task list is preloaded from that file?

On first thought it seems that as I get or put tasks into the queue, I should be reading and writing to a file simultaneously. However this doesn't give me the functionality of queue.task_done() and may not be the most optimized solution. Any ideas would be greatly appreciated.

Shah W
  • 147
  • 2
  • 11
  • Before you spend too much time writing your own parallel computing framework, check whether there's an existing framework you can use. E.g. IPython's parallel framework, or Kamaelia. – Thomas K Aug 04 '11 at 16:25

5 Answers5

5

Have you considered simply pickling your queue?

kojiro
  • 74,557
  • 19
  • 143
  • 201
  • Are you suggesting that every time a change is made to the queue, it should be serialized and saved onto a file? With a large queue size this may not be the optimal solution. – Shah W Aug 04 '11 at 20:42
  • To be sure. But you're already knee-deep in a suboptimal situation. Keep the worker threads from crashing and the question becomes moot. – kojiro Aug 05 '11 at 12:25
1

There are multiple approach to this, including the pickle module...

But in my opinion it would be simples to just write to a file, line per line, each element of the queue in collumns containing other properties you may want to save, like the task_done.

example:

element1, True
element2, False
...

In python is super-easy to read a file formatted like this, kinda like:

for line in file('path/file.ext'):
    name, state = line.split(sep_char)
    #and them insert into the queue...
BrainStorm
  • 2,036
  • 1
  • 16
  • 23
  • This was indeed the first solution that came to my mind. However I was hoping to find a quick implementation which is built on top of the queue class. An in-house solution would demand me to be adept at multithreading and managing shared data resources. – Shah W Aug 04 '11 at 20:47
0

The easy way to do this is to use AMQP for the message queues and let the message broker take care of the messages for you. I implemented a similar system using RabbitMQ as the message broker with durable persistent queues. The messages have even survived a crash of the RabbitMQ server software when I was using an outdated 1.72 server version on a virtual Linux server with only 512M of RAM and a million or so messages in play.

The way I do it is that each type of worker consumes messages from a different queue. If I need more than one worker of that type, then the message queue is automatically round robin, and if a worker cannot complete processing a message, they just don't ack it and it goes back on the queue.

I wrote a little shim module with about 80 lines of code to sit in front of kombu, and later rewrote that to use py-amqplib. If I had known about haigha earlier I would have used that since it matches very closely to the AMQP specifications document.

I do not recommend kombu because it is so complex for debugging and diverges from the AMQP standard in weird ways. Have a look at haigha because even though the documentation is no more than one example code fragment on PyPi, it is better documented than either kombu or amqplib because you can use the AMQP specs as your haigha docs.

Michael Dillon
  • 31,973
  • 6
  • 70
  • 106
0

A simple option I can offer would be to wrap a database table in a class and use that as your queue. A auto-increment column would work wonders for this (the next item to remove is the one with the lowest ID).

class dbQueue:
  init():
    # Pick some random id for this run (or set it to some thing you know).
  put():
    # Insert entry into table
  get():
    # The update .. select combo removes the need for a database that has transactions.
    # If no entries bear your ID:
      # Update the next entry that is not already marked with your ID.
    # Select the entry that matches your ID and return it.
  task_done():
    # Delete the entry with your ID.

This won't have the best performance depending on how often the queue gets updated, even the in-memory sqlite database won't be as fast as a linked list structure. The flip side is that you can view the database using any tool that can access the database, so you can see which one is in progress.

David
  • 133
  • 3
  • 8
0

Implement a handshaking mechanism between the worker and the master.

The master has a list of tasks, before putting them inside a Queue, pickle the list to a file. Then insert the task into the Queue. When the worker is done, he sends back an ACK message. Only at that point unpickle the task list and delete the corresponding id.

fabrizioM
  • 46,639
  • 15
  • 102
  • 119