0

Problem:

I have list of 2M+ users data in my datastore project. I would like to send a weekly newsletter to all users. The mailing API accepts max 50 email address per API call.

Previous Solution:

Used app-engine backend and a simple datastore query to process all the records at one go. But what happens is, sometimes I get memory overflow critical error log and the process starts all over again. Because of this some users, get the same email more than once. So I moved to dataflow.

Current Solution:

I use the FlatMap function to send each email id to a function and then send email individually to each user.

def process_datastore(project, pipeline_options):
    p = beam.Pipeline(options=pipeline_options)
    query = make_query()
    entities = (p | 'read from datastore' >> ReadFromDatastore(project, query))
    entities | beam.FlatMap(lambda entity: sendMail([entity.properties.get('emailID', "")]))
    return p.run()

With cloud dataflow, I have ensured that each user gets a mail only once and also nobody is missed out. There are no memory errors.

But this current process takes 7 hours to finish running. I have tried to replace FlatMap with ParDo, with the assumption that ParDo will parallelize the process. But even that takes same time.

Question:

  1. How to bunch the email ids in group of 50, so that the mail API call is effectively used?

  2. How to parallelize the process such that the time taken is less than an hour?

Sriram
  • 8,574
  • 4
  • 21
  • 30
  • Your pipeline might be suffering from one of the conditions described here https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py#L58 . In that case you'll need to introduce more parallelism by breaking fusion https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion – jkff Sep 13 '17 at 15:35
  • Also: FlatMap and ParDo are equivalent - they are both parallel, but both subject to fusion (see above). In general when debugging performance of jobs, please include a Dataflow Job Id that an oncall engineer can take a look at. – jkff Sep 13 '17 at 16:05

1 Answers1

1

You could use query cursors to split your users in batches of 50 and do the actual batch processing (the email sending) inside push queue or deferred tasks. This would be a GAE-only solution, without cloud dataflow, IMHO a lot simpler.

You can find an example of such processing in Google appengine: Task queue performance (taking the answer into account as well). That solution is using the deferred library, but it is almost trivial to use push queue tasks instead.

The answer touches on the parallelism aspect in the sense that you may want to limit it to keep costs down.

You could also split the batching itself inside the tasks to obtain an indefinitely scalable solution (any number of recipients, without hitting memory or deadline exceeded failures), with the task re-enqueing itself to continue the work from where it left off.

Dan Cornilescu
  • 39,470
  • 12
  • 57
  • 97