1

Writing a program that verify list of emails syntax and MX records, as blocking programming is time consuming, I want do this async or by threads, this my code:

with open(file_path) as f:
    # check the status of file, if away then file pointer will be at the last index
    if (importState.status == ImportStateFile.STATUS_AWAY):
        f.seek(importState.fileIndex, 0)

    while True:
        # the number of emails to process is configurable 10 or 20
        emails = list(islice(f, app.config['NUMBER_EMAILS_TO_PROCESS']))
        if len(emails) == 0:
            break;

        importState.fileIndex = importState.fileIndex + len(''.join(emails))

        for email in emails:
            email = email.strip('''<>;,'\r\n ''').lower()
            d = threads.deferToThread(check_email, email)
            d.addCallback(save_email_status, email, importState)

        # set the number of emails processed 
        yield set_nbrs_emails_process(importState)

        # do an insert of all emails
        yield reactor.callFromThread(db.session.commit)

# set file status as success
yield finalize_import_file_state(importState)
reactor.callFromThread(reactor.stop)

Check email function:

def check_email(email):
    pipe = subprocess.Popen(["./check_email", '--email=%s' % email], stdout=subprocess.PIPE)
    status = pipe.stdout.read()
    try:
        status = int(status)
    except ValueError:
        status = -1

    return status

what I need is to process 10 emails in same time and wait for result.

Lhassan Baazzi
  • 1,128
  • 1
  • 11
  • 21
  • are there 10 emails, or do you want to send no more than 10 emails concurrently? – jfs Oct 10 '14 at 00:00
  • is there `@inlineCallBacks` decorator in your code (implied by all the `yield` statements)? – jfs Oct 10 '14 at 00:02
  • Yeah, there is `@inlineCallBacks`, I want to process 10 or 20 emails by bulk and then insert it to DB. – Lhassan Baazzi Oct 10 '14 at 07:24
  • If you don't care about limiting the number of concurrently processed e-mails then just use `gatherResults()` as @Jean-Paul Calderone suggested. – jfs Oct 10 '14 at 07:45
  • you should probably open a separate question about how to send a single e-mail without blocking in twisted. – jfs Oct 10 '14 at 07:47
  • the reason why limiting to 10 is because I have a list of 1M emails, so yeah your idea is good, check a single email without blocking and save each email checked to a buffer, when the buffer reach the limited size then do an insert and reset buffer. – Lhassan Baazzi Oct 10 '14 at 08:21
  • here's an example on how to [limit the number of sending-at-the-same-time e-mails using a semaphore in twisted](http://stackoverflow.com/q/20336476/4279) – jfs Oct 10 '14 at 09:05
  • here's a [code example on how to connect to millions of hosts (20 at a time) in twisted](http://stackoverflow.com/a/4868866/4279) – jfs Oct 10 '14 at 09:08

1 Answers1

2

I'm not sure why there are threads involved in your example code. You don't need threads to interact with email with Twisted, nor to do so concurrently.

If you have an asynchronous function that returns a Deferred, you can just call it ten times and the ten different streams of work will proceed in parallel:

for i in range(10):
    async_check_email_returning_deferred()

If you want to know when all ten results are available, you can use gatherResults:

from twisted.internet.defer import gatherResults
...
email_results = []
for i in range(10):
    email_results.append(async_check_mail_returning_deferred())
all_results = gatherResults(email_results)

all_results is a Deferred that will fire when all of the Deferreds in email_results have fired (or when the first of them fires with a Failure).

Jean-Paul Calderone
  • 47,755
  • 6
  • 94
  • 122
  • can you put the code for this function `async_check_email_returning_deferred` – Lhassan Baazzi Oct 06 '14 at 12:55
  • It's any function that returns a Deferred. I'm not sure what you actually mean by "check email" so I don't know how this function would actually be implemented. It would probably use some APIs from `twisted.mail` though. – Jean-Paul Calderone Oct 07 '14 at 01:05
  • I have added the check function – Lhassan Baazzi Oct 09 '14 at 22:14
  • Unfortunately that function doesn't return a Deferred - and in fact isn't asynchronous at all. It is blocking. So the *only* neat concurrency tool you get to use is threads. This answer still applies if you define `async_check_mail_returning_deferred` as something like `return deferToThread(check_email, email)` since that uses threads to make your blocking function into an asynchronous, Deferred-returning function. Though then you're mixing threads and processes which is always a risky thing to do. Consider using Twisted Mail instead of the subprocess module. – Jean-Paul Calderone Oct 10 '14 at 11:39