1

So I am working on a little Python tool to stress test an API of application.

I've got a pretty nice script using Threading, but then I read that it will require manual coding to maintain n number of concurrent threads (meaning, starting new ones as soon as old ones finish), and the suggestion here: How to start a new thread when old one finishes? is to use ThreadPool, I tried as follows:

def test_post():
    print "Executing in " + threading.currentThread().getName() + "\n"
    time.sleep(randint(1, 3))
    return randint(1, 5), "Message"


if args.send:
    code, content = post()
    print (code, "\n")
    print (content)
elif args.test:
    # Create new threads
    print threads
    results_list = []
    pool = ThreadPool(processes=threads)
    results = pool.apply_async(test_post())
    pool.close()  # Done adding tasks.
    pool.join()  # Wait for all tasks to complete.
    # results = list(pool.imap_unordered(
    #     test_post(), ()
    # ))
    # thread_list = []
    # while threading.activeCount() <= threads:
    #     thread = LoadTesting(threadID=free_threads, name="Thread-" + str(threading.activeCount()), counter=1)
    #     thread.start()
    #     thread_list.append(thread)
    print "Exiting Main Thread" + "\n"
else:
    print ("cant get here!")

When I invoke the script, I get consistent output such as:

4

Executing in MainThread

Exiting Main Thread

I am not sure why.. as you see in commented out block I tried different ways and it still does it only once.

My goal is to make the script run in loop, always running n threads at any time. the test_post (and respectively, post) functions return the HTTP response code, and the content - I would like to later use this to print/stop when response code is NOT 200 OK.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
Carmageddon
  • 2,627
  • 4
  • 36
  • 56
  • 3
    `pool.apply_async(test_post())` is calling `test_post()` *once*, RIGHT NOW, in the main thread - and then attempting to distribute its *return value* to your thread pool. To start with, you need to get rid of the `()` so that you are sending the actual function to the pool. – jasonharper Feb 14 '19 at 20:56
  • 1
    Also, you only ever `apply_async` a single task... – juanpa.arrivillaga Feb 14 '19 at 20:58
  • Thanks, I still don't understand how to get it done.. all examples show something like this without issues. @roganjosh I am reading through `multiprocessing` library docs, but I am no closer to getting it running 4 times while also getting multiple value response from the child thread(s)... @jasonharper I tried removing the braces , but the print out has not changed... – Carmageddon Feb 14 '19 at 21:06
  • @juanpa.arrivillaga I am not sure I follow you, what do you mean a single task? I am giving it a single function to execute... – Carmageddon Feb 14 '19 at 21:06
  • @Carmageddon you only call `apply_async` once, so it will only ever create a single thread and call that function once. – juanpa.arrivillaga Feb 14 '19 at 21:16
  • @juanpa.arrivillaga I dont understand, am I supposed to use it in conjunction with something else? – Carmageddon Feb 14 '19 at 21:20
  • I'm saying that even if you had fixed the problem pointed out by @jasonharper and called `apply_async` correctly by passing *the function, not the result of calling the function*, i.e. by using `pool.apply_async(test_post)` instead of `test_post()`, you would still only be calling `apply_async` **once** so it will only create **one thread** and call the function **once**. So to run it N times, you would need something like `for _ in range(n): pool.apply_async(test_post)` – juanpa.arrivillaga Feb 14 '19 at 21:23
  • @juanpa.arrivillaga Actually the threads are already created when the Pool is instantiated, it's just that they wait (`inqueue.get()`) to be fed with tasks. @Carmageddon You should notice that the print comes from a different thread than the MainThread when you drop `()` and just pass `pool.apply_async(test_post)` . – Darkonaut Feb 14 '19 at 21:27
  • 1
    @Darkonaut yes, you are correct, the threads are created when the pool is initialized. To be precise, only **one** thread will be fed **one task** (the task here being the function you pass to apply_async) – juanpa.arrivillaga Feb 14 '19 at 21:34
  • Thanks guys, I understand the task issue now, but how do I make it run n threads, and continuously start new ones as soon as older ones close? – Carmageddon Feb 14 '19 at 21:35
  • You need to differentiate between the unit of work (job, task) and a thread. The whole point of using a pool in the first place is re-using your executors, be it threads or processes. As long as you don't close the Pool, all inital threads stay alive. So you don't care about recreating threads, you just call pool-methods of an existing pool as often you have some work you want to distribute. – Darkonaut Feb 14 '19 at 22:00
  • `apply_async` is a single call job, you either have to call it four times or you use a mapping pool-method like `pool.map` which maps one function over an iterable. Looking at the first image [here](https://stackoverflow.com/a/54032744/9059420) might help to understand. – Darkonaut Feb 14 '19 at 22:01
  • Thanks @Darkonaut, I will keep reading that stuff, but basically my post function does not take any arguments, itt currently re-uses the same `post` method to post the same data from args variable available in the main scope, and that is why .map is not working for me. So have I misunderstood what Pool can do? Do I still need to check when a process finished, and call `apply_async` again once one thread is free? What I dont like about this line of thought, is that I blolck on one thread, waiting for it while another one might have already finished sooner... doesnt seem efficient approach – Carmageddon Feb 14 '19 at 22:09
  • @Carmageddon You can use asynchronous pool methods like `apply_async` whenever you have something, you don't need to check if a worker is free. The tasks will be put on a queue and whenever a worker is free it will try to get something from that queue. – Darkonaut Feb 14 '19 at 22:20
  • Can i do that in an infinite while loop? Will it block until 1 frees up? – Carmageddon Feb 14 '19 at 22:21
  • You can call `apply_async` in a while loop sure, it won't block, that's why it's async and the internal queue-size is unlimited. The tasks _stay_ in the queue until one worker _is_ free, so you don't need to care about free or not free. – Darkonaut Feb 14 '19 at 22:32
  • Ohh I see, and what if I wanted to block if there is no free process? I just use the sync one? – Carmageddon Feb 14 '19 at 22:39
  • @Darkonaut ok I read the docs again, I just tried it in simplified test file: ` p = Pool(2) for i in range(5): results = p.apply_async(work()) ` and it still executes sequentially, printing out line 1, sleeping random 1-3 secs, line 2 then it repeats.. so not parallelized :( why? – Carmageddon Feb 15 '19 at 00:56
  • You are again calling `work()` instead of passing the function `work` as argument. But if I understand your wanted behaviour right, I currently see no way to implement that with Pool. Would be possible with building your own pool, though. – Darkonaut Feb 15 '19 at 01:06
  • @Darkonaut my bad, sorry! I fixed it, but now apparently work function is not being invoked at all... why? And what do you mean by no way to implement that with a pool? A Pool cant do what I want? meaning start N jobs, and replace/return the next available process or something? – Carmageddon Feb 15 '19 at 01:10
  • Can't tell from what you showed. You're asking for immediate resubmission so you can't just block-wait with `.get()` on the first `AsyncResult` object ...complicated. – Darkonaut Feb 15 '19 at 01:36
  • 1
    Yes essentially that is what I am trying to do - a simple POST stress test tool, you specify n threads, and it endlessly runs n parallel threads submitting requests until you ctrl-c. Hard to believe there is no easy way to accomplish this in Python :( – Carmageddon Feb 15 '19 at 01:42
  • @Darkonaut here is the up to date code, if it helps to see what I have tried so far: ` p = Pool(2) for i in range(5): results = p.apply_async(work) ` - in the above, work is not being invoked at all apparently... – Carmageddon Feb 15 '19 at 01:42

1 Answers1

1

Your first problem is that you already called your function in the MainThread with calling:

pool.apply_async(test_post())

...instead of passing test_post as an argument for a call to be executed in a worker-thread with:

pool.apply_async(test_post)

OP: I've got a pretty nice script using Threading, but then I read that it will require manual coding to maintain n number of concurrent threads (meaning, starting new ones as soon as old ones finish) ...

You need to distinguish between the unit of work (job, task) and a thread. The whole point of using a pool in the first place is re-using the executors, be it threads or processes. The workers are already created when a Pool is instantiated and as long as you don't close the Pool, all initial threads stay alive. So you don't care about recreating threads, you just call pool-methods of an existing pool as often as you have some work you want to distribute. Pool takes this jobs (a pool-method call) and creates tasks out of it. These tasks are put on an unbounded queue. Whenever a workers is finished with a task, it will blockingly try to get() a new task from such an inqueue.


OP: Pool only executes a single thread instead of 4...I tried different ways and it still does it only once.

pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

...is a single-call, single task producing job. In case you want more than one execution of func, you either have to call pool.apply_async() multiple times, or you use a mapping pool-method like

pool.map(func, iterable, chunksize=None)

..., which maps one function over an iterable. pool.apply_async is non-blocking, that is why it is "async". It immediately returns an AsyncResult-object you can (blockingly) call .wait() or .get() upon.


Through the comments it became clear, that you want endless and immediate replacements for finished tasks (self produced input-stream)...and the program should stop on KeyboardInterrupt or when a result does not have a certain value.

You can use the callback-parameter of apply_async to schedule new tasks as soon any of the old ones is finished. The difficulty lies in what to do meanwhile with the MainThread to prevent the whole script from ending prematurely while keeping it responsive for KeyboardInterrupt. Letting the MainThread sleep in a loop lets it still immediately react upon KeyboardInterrupt while preventing early exit. In case a result should stop the program, you can let the callback terminate the pool. The MainThread then just has to include a check of the pool-status in his sleep-loop.

import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool


def test_post(post_id):
    time.sleep(randint(1, 3))
    status_code = choice([200] * 9 + [404])
    return "{} {} Message no.{}: {}".format(
        datetime.now(), current_thread().name, post_id, status_code
    ), status_code


def handle_result(result):
    msg, code = result
    print(msg)
    if code != 200:
        print("terminating")
        pool.terminate()
    else:
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )


if __name__ == '__main__':

    N_WORKERS = 4

    post_cnt = count()

    pool = ThreadPool(N_WORKERS)

    # initial distribution
    for _ in range(N_WORKERS):
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )

    try:
        while pool._state == 0:  # check if pool is still alive
            time.sleep(1)
    except KeyboardInterrupt:
        print(" got interrupt")

Example Output with KeyboardInterrupt:

$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt

Example Output with termination due to unwanted return value:

$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating

Note, in your scenario you can also call apply_async more often than N_WORKERS-times for your initial distribution to have some buffer for reduced latency.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • I tried to modify the while condition using: `should_continue = True` and then when I get POST errors like != 200 code, set `should_continue = False` (in the handler) but that is overshadowing the original variable, I guess in separate thread.. how do I notify the main thread that I had an error, and would like to stop sending POSTS to allow easy inspection of the output? – Carmageddon Feb 15 '19 at 15:40
  • I tried to do `pool.close()` from inside the handler but getting assert self._state == RUN AssertionError - I suppose it does not like me closing the pool from inside a child thread :) – Carmageddon Feb 15 '19 at 15:47
  • I think I got it, had to use `should_continue = Value(c_bool, True)`, pass it around to the post_handler, - in handler, check `result[3].value` before enqueing another task, and in handler.. seems to allow the process to gracefully exit! – Carmageddon Feb 15 '19 at 16:56
  • @Carmageddon Are you speaking about `multiprocessing.Value`? Using shared memory would be unnecessarily complex here. – Darkonaut Feb 15 '19 at 17:02
  • yes I do, it seems to work beautifully here.. unless I was just lucky with timings? What is your alternative suggestion? – Carmageddon Feb 15 '19 at 17:07