If I start a thread with start_new_thread
, and have n independent functions that I want to run concurrently, I would do the following:
def foo1(x):
print "foo1"
time.sleep(5)
def foo2(x):
print "foo2"
time.sleep(1)
func_list = [foo1,foo2]
for k,j in enumerate(func_list):
thread.start_new_thread(func_list[k],(1 ,))
Both functions have the exact same code, but both functions are also independent in that they send messages to independent ZMQ sockets (that in turn wait on a repsonse from an external API before sending back a message for processing within foo
).
foo1
might take 5 seconds to complete the processing depending on the resopnse time of the API and the size of the payload, hence the problem is that if I try to fire it again in a new thread while it is still processing, the ZMQ socket throws an exception (seen the git already, it's not a bug)
So if foo1 is busy, foo2 is available, if foo2 is busy foo(n) might be available (up to foo15) so there are plenty of workers available. But how do I tell which function is busy, and if it's busy wait on it finishing, or if other workers available, use them instead?
Remember I can't just spool up 15 threads of the same function because for all intents and purposes, they are independent.
Can someone help, this is a very confusing problem I have created for myself. Thanks.
EDIT @ martineau -
I have list of sockets that I import, I wish I didn't have to do it but the API I use has no limit of http connections (within reason), but a limit on how many requests each one can handle. Hence more connections is the only way to more speed.
The folowing is the setup for the job - I process 10 records at a time corresponding to the 10 connections I keep alive with the API. I just went with pooling the threads and will give up the ghost on making another thread run if one is busy (that's a bit too complicated), hence if one thread takes 5 seconds it will delay the next batch of 10. It's a compromise.
import socket_handler_a, socket_handler_b ...
def multi_call(reduce_kp, exe_func):
def trd_call_a(x,y):
exe_func(socket_handler_a(x),y)
def trd_call_b(x,y):
exe_func(socket_handler_b(x),y)
def trd_call_c(x,y):
exe_func(socket_handler_c(x),y)
def trd_call_d(x,y):
exe_func(socket_handler_d(x),y)
def trd_call_e(x,y):
exe_func(socket_handler_e(x),y)
def trd_call_f(x,y):
exe_func(socket_handler_f(x),y)
def trd_call_g(x,y):
exe_func(socket_handler_g(x),y)
def trd_call_h(x,y):
exe_func(socket_handler_h(x),y)
def trd_call_i(x,y):
exe_func(socket_handler_i(x),y)
def trd_call_j(x,y):
exe_func(socket_handler_j(x),y)
func_list = [trd_call_a, trd_call_b,
trd_call_c, trd_call_d,
trd_call_e, trd_call_f,
trd_call_g, trd_call_h,
trd_call_i, trd_call_j]
def chunks_(l, n):
for i in range(0, len(l), n):
yield l[i:i+n]
threads = []
for query_lst in chunks_([i for i in reduce_kp], 10):
for k, j in enumerate(query_lst):
thread1 = threading.Thread(target=func_list[k], args=(j[0] ,j[1]))
thread1.start()
threads.append(thread1)
for thread in threads: thread.join()
And that is called with this:
def test_case(q_list):
reduce_kp = []
for k in q_list:
reduce_kp.append([{'QTE':'EUR_USD'}, [k,'BAL'] ])
multi_call(reduce_kp, test_case_resp)
And the responses are called from threads, i.e.
def test_case_resp(resp,x):
#process resp