7

I need to run a function int f(int i) with 10_000 parameters and it takes around 1sec to execute due to I/O time.
In a language like Python, I can use threads (or async/await, I know, but I'll talk about it later) to parallelize this task.
If I want to always have 10 running threads, and to split the task between them, I can use ThreadingPool :

def f(p):
    x = [...]
    return x

p = ThreadPool()
xs = p.map(f, range(10_000))

But how does it work ? If I want to implement a similar thing with, let's say NodeJS and f = http("www.google.com", callback), where should I begin ? What's the algorithms for this kind of problem ?
Again, I'd like to get 10 requests at the same time, and when one is finished the next one should start.

What I've been thinking so far (ugly because the callback is starting a new call to the f() function):

queue = ["www.google.com", "www.facebook.com"]
var f = function(url) {
  http.get(url, (e) => {
    const newUrl = queue.pop();
    f(newUrl);
  });
};

for (var i = 0; i < 10; i++) {
  f(queue.pop());
}
Stefano
  • 103
  • 5
  • You don’t need a thread pool to do this in Node.js. (You would if your tasks were CPU-bound, but Node is all about evented I/O on a single thread.) Anyway, see the `concurrency` option in http://bluebirdjs.com/docs/api/promise.map.html, and probably https://www.npmjs.com/package/request-promise for convenience. – Ry- Mar 15 '19 at 21:50
  • Promise are a nice alternative for callback, but you can't control how many task you are starting. Here I don't want to run the `f()` function 10K times at the same moment. – Stefano Mar 15 '19 at 21:57
  • Not only *can* you control how many tasks you’re starting, the `concurrency` option I linked to will do it all for you. – Ry- Mar 15 '19 at 21:59
  • Okay, your comment was not refreshed when I wrote mine. My question is more about *how is it done* than *it could be done with some tools*. Thanks anyway for the link – Stefano Mar 15 '19 at 22:05

6 Answers6

4

Reimplementation of that Bluebird function I linked to:

const mapWithConcurrency = async (values, concurrency, fn) => {
    let i = 0;
    let results = values.map(() => null);

    const work = async () => {
        while (i < values.length) {
            const current = i++;
            results[current] = await fn(values[current]);
        }
    };

    await Promise.all(Array.from({length: concurrency}, work));

    return results;
};

mapWithConcurrency(Array.from({length: 30 * 15}, (_, i) => i), 10, async i => {
    const el = document.body.appendChild(document.createElement('i'));
    el.style.left = 5 * (i % 30) + 'px';
    el.style.top = 5 * (i / 30 | 0) + 'px';
    await new Promise(resolve => { setTimeout(resolve, Math.random() * 500); });
    el.style.background = 'black';
    return 2 * i;
}).then(results => {
    console.log(results.length, results.every((x, i) => x === 2 * i));
});
i {
    background: grey;
    transition: background 0.3s ease-out;
    position: absolute;
    width: 5px;
    height: 5px;
}
Ry-
  • 218,210
  • 55
  • 464
  • 476
2

Not sure it is how ThreadPool and other libraries are implemented but here is a hint : use Queues to count how many tasks/threads are running.
I didn't try this code but it can give you an idea: we create a Thread checking every 0.2 second if we should start another Thread.
This implies a lot of context switching however and might not be efficient.

class Pool:
    def __init__(self, func: Callable, params: list, thread_max = 10):
        self.func = func
        self.params = params
        self.running = 0
        self.finished = []
        self.thread_max = thread_max
        self.threads = []

    def start(self):
        Thread(target=check, args=(0.2)).start()

    def check(self, t_sleep=0.5):
        done = False
        while not done:
            sleep(t_sleep)
            # first check for finished threads
            for t in threads:
                if not t.isAlive():
                    # do something with return value
                    # ...
                    self.threads.remove(t)

            if not len(self.params): # mean there is no more task left to LAUNCH
                done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
                continue # avoid the next part (launching thread)

            # now start some threads if needed
            while len(self.threads) < self.thread_max:
                arg = self.params.pop()
                thread = Thread(target=self.func, args=(arg, ))
                threads.insert(thread)
                thread.start()

I however do not have any clue for async/await (keywords now available in python)

politinsa
  • 3,480
  • 1
  • 11
  • 36
  • Thank you for helping me understanding pooling. But your solution isn't really efficient, is it ? I'd like to hear a bit more about async/await too. – Stefano Mar 15 '19 at 22:07
0

In python, the thread pool only uses 1 cpu core. But since your task is I/O bounded, it will do better than serial execution of the 10k function calls.

To do better, you can try process pool, which can utilize multiple cores. Or even combine asyncio with processes. Depending on your problem, there may or may not be further speedup using these two approaches, using thread pool as baseline.

See this example of combining thread/process with asyncio. It should work for your case directly. Your function f is the equivalent of their function block.

In Python 3.6, the general form of asyncio code is to create an event loop to run an async function. A very simple example is

import asyncio

async def coroutine():
    print('in coroutine')

coro = coroutine()
event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(coro)
event_loop.close()

For simplicity, you can think of the return of the async def function is something to be executed (a coroutine), and the loop executes it. If there are N tasks to be executed asynchronuously, you can define them with N async def functions, and another one that awaits them. This very last async function defines what 'finish' means for the N tasks. For example, maybe 'finish' means all N tasks are done, or whenever 1 of them is done, etc. And the loop executes this N+1'th function.

In Python 3.7, the asyncio APIs changed a little, and the loop doesn't need to be created explicitly. You can find some examples in my blog post.

nos
  • 19,875
  • 27
  • 98
  • 134
0

Late answer, but the way I normally handle multiple threads with a maximum thread limit of X, is as follows:

import threading
import requests, json
import time
from urllib.parse import urlparse

final_dict = {} # will hold final results

def parser(u):
    try:
        parsed_uri = urlparse(u) # parse url to get domain name that'l be used as key in final_dict
        domain = "{uri.netloc}".format(uri=parsed_uri)
        x = requests.get(u)
        status_code = x.status_code
        headers = x.headers
        cookies = x.cookies
        # OR cookies = ";".join(f"{k}:{v}" for k,v in x.cookies.iteritems())
        html = x.text
        # do something with the parsed url, in this case, I created a dictionary containing info about the parsed url: timestamp, url, status_code, html, headers and cookies
        if not domain in final_dict:
            final_dict[domain] = []
        final_dict[domain].append( {'ts': time.time(), 'url': u, 'status': status_code , 'headers': str(headers), 'cookies': str(cookies), 'html': html} )

    except Exception as e:
        pass
        print(e)
        return {}

max_threads = 10
urls = ['https://google.com','https://www.facebook.com', 'https://google.com/search?q=hello+world', 'https://www.facebook.com/messages/', 'https://google.com/search?q=learn+python', 'https://www.facebook.com/me/photos', 'https://google.com/search?q=visit+lisboa', 'https://www.facebook.com/me/photos_albums']

for u in urls:
    threading.Thread(target=parser, args=[u]).start()
    tc = threading.active_count()
    while tc == max_threads:
        tc = threading.active_count()
        time.sleep(0.2)

while tc != 1: # wait for threads to finish, when tc == 1 no more threads are running apart from the main process.
    tc = threading.active_count()
    time.sleep(0.2)

print(json.dumps(final_dict))

'''
# save to file
with open("output.json", "w") as f:
    f.write(json.dumps(final_dict))

# load from file
with open("output.json") as f:
    _json = json.loads(f.read())
'''

Output:

  1. Please check the json generated above at: https://jsoneditoronline.org/?id=403e55d841394a5a83dbbda98d5f2ccd
  2. The code above is, some how, "my own code" and by this I mean that it was used in a previous project and it may not fully answer your question, still, hope it's a good resource for future users.
  3. On Linux I normally set max_threads to 250 and on Windows to around 150.

enter image description here

Pedro Lobito
  • 94,083
  • 31
  • 258
  • 268
0

To have a similar behaviour that nodejs you have To use reactive x programming. What you are looking for is rxpy . https://github.com/ReactiveX/RxPY

sancelot
  • 1,905
  • 12
  • 31
0

Take a look at my newly published module: concurrency-controller

It can invoke functions concurrently with a given degree of concurrency.

Majid Yaghouti
  • 907
  • 12
  • 25