0

I have a text file containing several million URLs and I have to run a POST request for each of those URLs. I tried to do it on my machine but it is taking forever so I would like to use my Spark cluster instead.

I wrote this PySpark code:

from pyspark.sql.types import StringType
import requests

url = ["http://myurltoping.com"]
list_urls = url * 1000 # The final code will just import my text file
list_urls_df = spark.createDataFrame(list_urls, StringType())

print 'number of partitions: {}'.format(list_urls_df.rdd.getNumPartitions())

def execute_requests(list_of_url):
    final_iterator = []
    for url in list_of_url:
        r = requests.post(url.value)
        final_iterator.append((r.status_code, r.text))
    return iter(final_iterator)

processed_urls_df = list_urls_df.rdd.mapPartitions(execute_requests)

but it is still taking a lot of time, how can I make the function execute_requests more efficient launching the requests in each partition asynchronously for example?

Thanks!

Pierre
  • 938
  • 1
  • 15
  • 37
  • Do you care whether the requests succeed? Is it something you need to run only once or often? Spark may not be the best tool for that. Perhaps a simple java program can do the trick. – Vitaliy Nov 20 '18 at 16:48
  • I just want to see if the response status is 400 otherwise would retry the request. And it is something I just need to run once. – Pierre Nov 20 '18 at 17:00

1 Answers1

1

Using the python package grequests(installable with pip install grequests) might be an easy solution for your problem without using spark.

The Documentation (can be found here https://github.com/kennethreitz/grequests) gives a simple example:

import grequests

urls = [
    'http://www.heroku.com',
    'http://python-tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://fakedomain/',
    'http://kennethreitz.com'
]

Create a set of unsent Requests:

>>> rs = (grequests.get(u) for u in urls)

Send them all at the same time:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, None, <Response [200]>]

I found out, that using gevent wihtin a foreach on a spark Dataframe results in some weird errors and does not work. It seems as if spark also relies on gevent, which is used by grequests...

MPeter
  • 11
  • 1