I want to download a many files from queue using twisted and (for example ) 20 clients-threads. Any example ?
Asked
Active
Viewed 4,171 times
3 Answers
9
from twisted.internet.defer import inlineCallbacks, DeferredQueue
@inlineCallbacks
def worker(queue):
while 1:
url = yield queue.get() # wait for a url from the queue
if url is None: # insert None into the queue to kill workers
queue.put(None)
return # done
data = yield download(url) # download the file
process(data) # do stuff with it
queue = DeferredQueue() # your queue
# make workers
MAX = 20
workers = [worker(queue) for _ in range(MAX)]

Jochen Ritzel
- 104,512
- 31
- 200
- 194
-
This works, but it downloads the files with no parallelization. It might be worth adding an example that allows multiple concurrent downloads. – Glyph Aug 03 '11 at 00:51
-
2@Glyph: It doesn't download as fast as possible (the workers wait for the processing), but there should some parallelization because of the 20 `workers`. – Jochen Ritzel Aug 03 '11 at 08:54
-
Oops. Totally missed the loop at the bottom there. Please disregard my comment :). – Glyph Aug 03 '11 at 15:44
-
could someone comment on this, please: # insert None into the queue to kill workers – vak Apr 06 '14 at 12:22
-
2@vak: This `None` can be used to signal the workers to exit in a controlled way. It's sometimes called a "poison pill", here is a explanation: http://java.dzone.com/articles/producers-and-consumers-part-3 – Jochen Ritzel Apr 06 '14 at 13:03
-
@JochenRitzel, totally clear now, i was confused because i tried example as-is and the reactor was not started ;) – vak Apr 08 '14 at 07:58
2
Here's a translation of https://github.com/caolan/async to Python.
from twisted.internet import defer
class Queue:
workers = 0
tasks = []
def __init__(self, worker, concurrency):
self.worker = worker
self.concurrency = concurrency
self.saturated = None
self.empty = None
self.drain = None
def push(self, data):
deferred = defer.Deferred()
self.tasks.append({'data': data, 'callback': deferred})
if self.saturated and len(tasks) == concurrency:
self.saturated()
self.process()
return deferred
def task_finished(self, *args):
self.workers = self.workers - 1
if self.drain and len(self.tasks) + self.workers == 0:
self.drain()
self.process()
def process(self):
if self.workers >= self.concurrency or len(self.tasks) == 0:
return
task = self.tasks.pop(0)
if self.empty and len(self.tasks) == 0:
self.empty()
self.workers = self.workers + 1
d = self.worker(task['data'])
d.addCallback(self.task_finished)
d.addCallback(task['callback'].callback)
from twisted.web import client
from twisted.internet import reactor
def dl_worker(data):
url = data[0]
fname = data[1]
print "Download file:", fname
d = client.downloadPage(url, fname)
return d # very important!
q = Queue(dl_worker, 2)
q.drain = reactor.stop
for i in range(0,3):
q.push(["http://download.thinkbroadband.com/5MB.zip", "file"+str(i)])
reactor.run()
I hope this passes Glyph's QC :D Cheers!

odie5533
- 562
- 3
- 5
-2

Matt Joiner
- 112,946
- 110
- 377
- 526
-
2The fellow appears to already be using twisted. Care to elaborate on why it is a bad choice for this sort of thing? – stderr Aug 04 '11 at 14:07