0

I am running a webscraper class who's method name is self.get_with_random_proxy_using_chain.

I am trying to send multithreaded calls to the same url, and would like that once there is a result from any thread, the method returns a response and closes other still active threads.

So far my code looks like this (probably naive):

from concurrent.futures import ThreadPoolExecutor, as_completed
# class initiation etc

max_workers = cpu_count() * 5
urls = [url_to_open] * 50

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_to_url=[]
    for url in urls: # i had to do a loop to include sleep not to overload the proxy server
        future_to_url.append(executor.submit(self.get_with_random_proxy_using_chain,
                                     url,
                                     timeout,
                                     update_proxy_score,
                                     unwanted_keywords,
                                     unwanted_status_codes,
                                     random_universe_size,
                                     file_path_to_save_streamed_content))
        sleep(0.5)

    for future in as_completed(future_to_url):
            if future.result() is not None:
                return future.result()

But it runs all the threads.

Is there a way to close all threads once the first future has completed. I am using windows and python 3.7x

So far I found this link, but I don't manage to make it work (pogram still runs for a long time).

alex_noname
  • 26,459
  • 5
  • 69
  • 86
Je Je
  • 508
  • 2
  • 8
  • 23
  • What do you mean by "return results as they come"? It looks like it will return the first result and let all the other threads complete, ignoring their results. How can you get multiple successive results from that one return statement? – Dennis Sparrow Aug 01 '20 at 23:57
  • at the moment it returns one value (correct) but after having looped through all futures. I would like to stop all threads/futures after the first result is found. Looking further i saw this; https://stackoverflow.com/questions/52631315/python-properly-kill-exit-futures-thread, but can't find the _threads property ( i edited my question) – Je Je Aug 02 '20 at 12:05

1 Answers1

2

As far as I know, running futures cannot be cancelled. Quite a lot has been written about this. And there are even some workarounds.

But I would suggest taking a closer look at the asyncio module. It is quite well suited for such tasks.

Below is a simple example, when several concurrent requests are made, and upon receiving the first result, the rest are canceled.

import asyncio
from typing import Set

from aiohttp import ClientSession


async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()


async def wait_for_first_response(tasks):
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for p in pending:
        p.cancel()
    return done.pop().result()


async def request_one_of(*urls):
    tasks = set()
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.create_task(fetch(url, session))
            tasks.add(task)

        return await wait_for_first_response(tasks)


async def main():
    response = await request_one_of("https://wikipedia.org", "https://apple.com")
    print(response)

asyncio.run(main())
alex_noname
  • 26,459
  • 5
  • 69
  • 86
  • tx, do I need to use aiohttp, or requests will just work as well? I am not familiar with asyncio and aiohttp, but may well be the opportunity to look into that. – Je Je Aug 02 '20 at 13:18
  • Yes, for this you should use aiohttp or other asyncio lib, requests does not support async approach – alex_noname Aug 02 '20 at 14:41
  • and it seems like aiohttp does'nt support proxies in the form of IP:PORT, am a bit stuck – Je Je Aug 02 '20 at 14:55
  • 1
    Aiohttp should support address of proxy with port. On what grounds did you make this conclusion ? – alex_noname Aug 02 '20 at 15:44
  • i added: proxy = '13.75.114.68:25222' and modified: "async with session.get(url, proxy=str(proxy),) as response:", and i get error: in update_proxy raise ValueError("Only http proxies are supported"). This proxy works with requests, not sure what is teh issue then.. – Je Je Aug 02 '20 at 16:29
  • 1
    Add `http://` for http-proxies, i.e. `proxy = 'http://13.75.114.68:25222'` – alex_noname Aug 02 '20 at 16:41