0

I am trying to upload 100,000 data points to a web service backend. If I run it one at a time, it will take ~12 hours. They support 20 API calls simultaneously. How can I run this POST concurrently so I can speed up the import?

def AddPushTokens():

 import requests
 import csv
 import json

 count=0
 tokenList=[]

 apikey="12345"
 restkey="12345"
 URL="https://api.web.com/1/install/"
 headers={'content-type': 'application/json','Application-Id': apikey,'REST-API-Key':restkey}

 with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
      deviceTokens=csv.reader(csvfile, delimiter=',')

      for token in deviceTokens:

       deviceToken=token[0].replace("/","")
       deviceType="ios"
       pushToken="pushtoken_"+deviceToken
       payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
       r = requests.post(URL, data=json.dumps(payload), headers=headers)

       count=count+1
       print "Count: " + str(count)
       print r.content

Edit: I am trying to use concurrent.futures. Where I am confused is how do I set this up so it pulls the token from the CSV and passes it to load_url? Also, I want to make sure that it goes through the first 20 runs the requests, then picks up at 21 and runs the next set of 20.

import concurrent.futures
import requests

URLS = ['https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/',
     'https://api.web.com/1/installations/']


apikey="12345"
restkey="12345"
URL="https://api.web.com/1/installations/"
headers={'content-type': 'application/json','X-web-Application-Id': apikey,'X-web-REST-API-Key':restkey}


     with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
     deviceTokens=csv.reader(csvfile, delimiter=',')

     for token in deviceTokens:

          deviceToken=token[0].replace("/","")
          deviceType="ios"
          pushToken="pushtoken_"+deviceToken
          payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
          r = requests.post(URL, data=json.dumps(payload), headers=headers)


# Retrieve a single page and report the url and contents
def load_url(token):

     URL='https://api.web.com/1/installations/'

     deviceToken=token[0].replace("/","")
     deviceType="ios"
     pushToken="pushtoken_"+deviceToken
     payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
     r = requests.post(URL, data=json.dumps(payload), headers=headers)

     count=count+1
     print "Count: " + str(count)
     print r.content

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Edit: Updated based on Comments Below

import concurrent.futures
import requests
import csv
import json

apikey="ldy0eSCqPz9PsyOLAt35M2b0XrfDZT1NBW69Z7Bw"
restkey="587XASjEYdQwH2UHruA1yeZfT0oX7uAUJ8kWTmE3"
URL="https://api.parse.com/1/installations/"
headers={'content-type': 'application/json','X-Parse-Application-Id': apikey,'X-Parse-REST-API-Key':restkey}

with open('/Users/jgurwin/Desktop/push/push-new.csv','rU') as csvfile:
     deviceTokens=csv.reader(csvfile, delimiter=',')

     for device in deviceTokens:

        token=device[0].replace("/","")

        # Retrieve a single page and report the url and contents

        def load_url(token):

          count=0
          deviceType="ios"
          pushToken="pushtoken_"+token
          payload={"deviceType": deviceType,"deviceToken":token,"channels":["",pushToken]}
          r = requests.post(URL, data=json.dumps(payload), headers=headers)

          count=count+1
          print "Count: " + str(count)
          print r.content


        # We can use a with statement to ensure threads are cleaned up promptly
          with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
              # Start the load operations and mark each future with its URL
              future_to_token = {executor.submit(load_url, token, 60): token for token in deviceTokens}
              for future in concurrent.futures.as_completed(future_to_url):
                  url = future_to_url[future]
                  try:
                      data = future.result()
                  except Exception as exc:
                      print('%r generated an exception: %s' % (url, exc))
                  else:
                      print('%r page is %d bytes' % (url, len(data)))
Rangers4me
  • 177
  • 2
  • 9
  • Why is your new version attempting to do everything serially, and then to do the exact same work again in parallel? – abarnert Nov 18 '13 at 21:59
  • Meanwhile, do you really want to "make sure that it goes through the first 20 runs the requests, then picks up at 21 and runs the next set of 20"? What a thread pool does is picks up the first 20, then, as each one finishes, the thread picks up the next one without waiting for the other 19 to finish. Is there any reason you want to force all of the threads to wait until the slowest one finishes before any of them start on the next task? – abarnert Nov 18 '13 at 22:00
  • Also, do you not understand how comprehensions work? Because that's the part that sends each value in your list of values to the `submit` function (and the `submit` function is what then sends it to the `load_url` function). If [List Comprehensions](http://docs.python.org/2.7/tutorial/datastructures.html#list-comprehensions) and the example at the end of Dictionaries farther down the page don't explain it to you, please explain what part you're not getting. – abarnert Nov 18 '13 at 22:03

2 Answers2

4

The easy way to do this is with threads. The nearly-as-easy way is with gevent or a similar library (and grequests even ties gevent and requests together so you don't have to figure out how to do so). The hard way is building an event loop (or, better, using something like Twisted or Tulip) and multiplexing the requests yourself.

Let's do it the easy way.

You don't want to run 100000 threads at once. Besides the fact that it would take hundreds of GB of stack space, and your CPU would spend more time context-switching than running actual code, the service only supports 20 connections at once. So, you want 20 threads.

So, how do you run 100000 tasks on 20 threads? With a thread pool executor (or a bare thread pool).

The concurrent.futures docs have an example which is almost identical to what you want to do, except doing GETs instead of POSTs and using urllib instead of requests. Just change the load_url function to something like this:

def load_url(token):
    deviceToken=token[0].replace("/","")
    # … your original code here …
    r = requests.post(URL, data=json.dumps(payload), headers=headers)
    return r.content

… and the example will work as-is.

Since you're using Python 2.x, you don't have the concurrent.futures module in the stdlib; you'll need the backport, futures.


In Python (at least CPython), only one thread at a time can do any CPU work. If your tasks spend a lot more time downloading over the network (I/O work) than building requests and parsing responses (CPU work), that's not a problem. But if that isn't true, you'll want to use processes instead of threads. Which only requires replacing the ThreadPoolExecutor in the example with a ProcessPoolExecutor.


If you want to do this entirely in the 2.7 stdlib, it's nearly as trivial with the thread and process pools built into the multiprocessing. See Using a pool of workers and the Process Pools API, then see multiprocessing.dummy if you want to use threads instead of processes.

abarnert
  • 354,177
  • 51
  • 601
  • 671
  • How would I do a POST instead of a GET with grequests? I'm pretty new to Python. – Rangers4me Nov 18 '13 at 19:17
  • @Rangers4me: The exact same way you do with `requests`: Use the `post` method instead of the `get` method, and pass a `data` argument. But again, while `gevent` (and therefore `grequests`) is great for doing 500 things at once (which threads can't do), but for doing 100000 things 20 at a time, threads are easier. – abarnert Nov 18 '13 at 19:20
  • I just posted an update to the question. The problem I am having now is how do I pass the proper token to the load_url request? – Rangers4me Nov 18 '13 at 21:52
  • Instead of a list of URLs, you have a single URL, and a list of tokens. So just do `future_to_token = {executor.submit(load_url, token, 60): token for token in deviceTokens}`. – abarnert Nov 18 '13 at 21:58
  • Thanks for the suggestion - I think I'm getting really close. I've updated the code to reflect this (see above). The code does not throw any errors but nothing is printing out. – Rangers4me Nov 18 '13 at 22:32
  • @Rangers4me: You're still doing this all inside a loop over 100000 tokens. For each 100000 tokens, you want to create a pool and look up all 100000 of them? Also, why did you move the `token[0].replace("/","")` outside the `load_url` function that I gave you? – abarnert Nov 18 '13 at 22:40
  • @Rangers4me: Also, it's very hard to tell, since your code is full of `IndentationError`s, but are you trying to do the whole thread pool thing inside the `load_url` function? That's not going to work. Everything you change from the `futures` example and my code, without having a good reason to change it, is likely to just break things. So don't randomly change things. [Here](http://pastebin.com/P80Fy7ym) is a complete starting point. – abarnert Nov 18 '13 at 22:42
  • Sorry I'm new to this. I updated your code to add the timeout to the load_url because it was throwing an error (see paste bin: http://pastebin.com/Bf2Zjkke). Now it is returning an exception '['3c7cd3120ce12ec4adcd7a0544277468dd3992412a66ab61ea7bd1b02ab769f2', '', '', '', '', '', ''] generated an exception: cannot concatenate 'str' and 'list' objects' – Rangers4me Nov 18 '13 at 22:57
  • @Rangers4me: Instead of adding a `timeout` parameter that you don't actually use, why not just change the `submit` to not send that `60` argument? Meanwhile, it looks like you want `deviceToken`, not `token`, at the end of the `pushToken=` line, and on the next line. It might have been better to call the parameter `device` instead of `token` to avoid that confusion (which is my fault, for not figuring out what your original code was doing before trying to factor it into a function). – abarnert Nov 18 '13 at 23:10
  • @Rangers4me: Also, you can debug this the same way as any other code. You can test the `load_url` function by serially looping over it before trying to get it to work in parallel, or you can add `print` statements into the middle of it to see what's going on, etc. That should be a lot easier than expecting someone who doesn't have any sample data to run your code with to debug it for you one step at a time… – abarnert Nov 18 '13 at 23:14
  • I really appreciate all your help. I got it working and it's working great! – Rangers4me Nov 19 '13 at 01:36
0

Could be overkill, but you may like to have a look at Celery.

Tutorial

tasks.py could be:

from celery import Celery
import requests

app = Celery('tasks', broker='amqp://guest@localhost//')

apikey="12345"
restkey="12345"

URL="https://api.web.com/1/install/"
headers={'content-type': 'application/json','Application-Id': apikey,'REST-API-Key':restkey}

f = open('upload_data.log', 'a+')
@app.task
def upload_data(data, count):
    r = requests.post(URL, data=data, headers=headers)
    f.write("Count: %d\n%s\n\n" % (count, r.content)

Start celery task with:

$ celery -A tasks worker --loglevel=info -c 20

Then in another script:

import tasks
def AddPushTokens():

    import csv
    import json

    count=0
    tokenList=[]

    with open('/Users/name/Desktop/push-new.csv','rU') as csvfile:
        deviceTokens=csv.reader(csvfile, delimiter=',')

        for token in deviceTokens:
            deviceToken=token[0].replace("/","")
            deviceType="ios"
            pushToken="pushtoken_"+deviceToken
            payload={"deviceType": deviceType,"deviceToken":deviceToken,"channels":["",pushToken]}
   r = tasks.upload_data.delay(json.dumps(payload), count)

   count=count+1

NOTE: Above code is sample. You may have to modify it for your requirement.

shantanoo
  • 3,617
  • 1
  • 24
  • 37