1

If I start a thread with start_new_thread, and have n independent functions that I want to run concurrently, I would do the following:

def foo1(x):
    print "foo1"
    time.sleep(5)

def foo2(x):
    print "foo2"
    time.sleep(1)

func_list = [foo1,foo2]

for k,j in enumerate(func_list):
    thread.start_new_thread(func_list[k],(1 ,))

Both functions have the exact same code, but both functions are also independent in that they send messages to independent ZMQ sockets (that in turn wait on a repsonse from an external API before sending back a message for processing within foo).

foo1 might take 5 seconds to complete the processing depending on the resopnse time of the API and the size of the payload, hence the problem is that if I try to fire it again in a new thread while it is still processing, the ZMQ socket throws an exception (seen the git already, it's not a bug)

So if foo1 is busy, foo2 is available, if foo2 is busy foo(n) might be available (up to foo15) so there are plenty of workers available. But how do I tell which function is busy, and if it's busy wait on it finishing, or if other workers available, use them instead?

Remember I can't just spool up 15 threads of the same function because for all intents and purposes, they are independent.

Can someone help, this is a very confusing problem I have created for myself. Thanks.

EDIT @ martineau -

I have list of sockets that I import, I wish I didn't have to do it but the API I use has no limit of http connections (within reason), but a limit on how many requests each one can handle. Hence more connections is the only way to more speed.

The folowing is the setup for the job - I process 10 records at a time corresponding to the 10 connections I keep alive with the API. I just went with pooling the threads and will give up the ghost on making another thread run if one is busy (that's a bit too complicated), hence if one thread takes 5 seconds it will delay the next batch of 10. It's a compromise.

import socket_handler_a, socket_handler_b ...

def multi_call(reduce_kp, exe_func):

        def trd_call_a(x,y):
                exe_func(socket_handler_a(x),y)

        def trd_call_b(x,y):
                exe_func(socket_handler_b(x),y)

        def trd_call_c(x,y):
                exe_func(socket_handler_c(x),y)

        def trd_call_d(x,y):
                exe_func(socket_handler_d(x),y)

        def trd_call_e(x,y):
                exe_func(socket_handler_e(x),y)

        def trd_call_f(x,y):
                exe_func(socket_handler_f(x),y)

        def trd_call_g(x,y):
                exe_func(socket_handler_g(x),y)

        def trd_call_h(x,y):
                exe_func(socket_handler_h(x),y)

        def trd_call_i(x,y):
                exe_func(socket_handler_i(x),y)

        def trd_call_j(x,y):
                exe_func(socket_handler_j(x),y)

        func_list = [trd_call_a, trd_call_b,
                     trd_call_c, trd_call_d,
                     trd_call_e, trd_call_f,
                     trd_call_g, trd_call_h,
                     trd_call_i, trd_call_j]

        def chunks_(l, n):
                for i in range(0, len(l), n):
                    yield l[i:i+n]

        threads = []
        for query_lst in chunks_([i for i in reduce_kp], 10):
                for k, j in enumerate(query_lst):

                    thread1 = threading.Thread(target=func_list[k], args=(j[0] ,j[1]))
                    thread1.start()
                    threads.append(thread1)

                for thread in threads: thread.join()

And that is called with this:

def test_case(q_list):

        reduce_kp   = []
        for k in q_list: 
                reduce_kp.append([{'QTE':'EUR_USD'}, [k,'BAL'] ])
        multi_call(reduce_kp, test_case_resp)

And the responses are called from threads, i.e.

def test_case_resp(resp,x):
   #process resp
ajsp
  • 2,512
  • 22
  • 34
  • 1
    I think you could use a [`multiprocessing.pool.ThreadPool`](https://stackoverflow.com/questions/3033952/threading-pool-similar-to-the-multiprocessing-pool). That way, you wouldn't need to manually check and see when one was finished. [Here's an example](https://stackoverflow.com/a/44072760/355230) of using it. – martineau May 17 '18 at 17:36
  • _cont_ @ajsp: Note: It won't handle a list of functions like you have in your question, but you don't need really need them as far as I can tell—which is why I recommended it to you. – martineau May 17 '18 at 18:12
  • @martineau See the edit, and thanks for the input, and thanks for the answer you directed me to, it will come in handy. – ajsp May 17 '18 at 18:34
  • If both/all threaded functions have the exact same code, then you don't need (or want) to define separate versions of them. Just define one and pass it argument(s) to tell it what to do as it doesn't matter if the time it takes to execute varies because the `Pool` will just schedule another thread to run again with a different argument passed to it as soon as one is available. – martineau May 17 '18 at 18:44
  • 1
    @martineau you're right martineau, thanks again for the pointers. – ajsp May 17 '18 at 22:02

0 Answers0