4

I'm relatively new to Python and last touched threads and processes in C about 7 years ago, so please treat me as a newb in your responses.

I'm using Python 2.7.6 on Linux.

I am trying to query (and later download from) an online archive which only allows one connection per registered user and is pretty slow. It has its own API for the queries so I won't go in to that. I'm intending to perform the queries and later the downloads in parallel threads, one per user account. (for the record I'm not cheating the system, all the accounts are genuine users!)

accounts = ['user1','pass1','user2','pass2'...]
queries = ['query1','query2','query3',..., 'queryN' ]

numQueries = len(queries) 
numAc = len(accounts)/2

if numQueries < numAc:
  nThreads = numQueries 
else
  nThreads = numAc # most likely situation

# example of function for the query 
def runQuery(user, passw, query):
  # here's the API bit

Every example I have seen runs over one single list.

So, I'm at a loss. I can see how it would work if we forget all about accounts and constraints and were just running different queries.

How can I set up one thread per account and iterate over the list of queries/downloads? Remembering I'm using 2.7.

I'm also getting overwhelmed by the thread/process issue, so would appreciate lots of clarity in responses.

--- Edit - As code in comment below is unreadable, this is what I tried:

ulock = thread.allocate_lock()

def runQuery(userQueue, ulock, queryQueue):
    query = queryQueue.get()
    with ulock:
        user = userQueue.popleft()
        userQueue.append(user)
        passw = userQueue.popleft()
        userQueue.append(passw)
    print 'The executed query will use: ' + user + ' ' + passw + ' ' + ' ' + query + '\n'

for t in nThreads:    
    thread.start_new_thread(runQuery, (userQueue, ulock, queryQueue,))
emmalg
  • 346
  • 2
  • 12
  • I usually avoid using `pass` as variable name (in the `runQuery` function signature), because it is a Python keyword for "do nothing". You later use `passw`, which is far better. – logc Jun 26 '15 at 10:52
  • Also you could maybe create a new `multiprocessing.Process` for every user and then start the said process? Here's a good explanation of how to get going with this, from your perspective I don't think it makes any difference if you go for threads or processes: http://stackoverflow.com/questions/17172878/using-pythons-multiprocessing-process-class – Aleksander Lidtke Jun 26 '15 at 10:52

2 Answers2

2

I think your problem has a simpler answer if you see that all you want to do is distribute the queries over the user accounts that you have, so that no two threads use the same credentials at the same time.

Which means: assign each query to a user account (cycling over the user accounts because you do not have as many accounts as queries), then group by user account, and let each thread run all queries assigned to a single user account. Each thread receives a single set of credentials, so there can be no concurrency problem.

"""
Distributes a number N of queries over a set of M user accounts
"""
from itertools import izip, cycle, groupby
from threading import Thread


def run_query(account, queries):
    """Run a number of queries under the same account"""
    user = account[0]
    passw = account[1]
    for query in queries:
        print 'The executed query will use: ' + user + ' ' + passw + ' ' + ' ' + query + '\n'


def main():
    """Distributes the queries and then runs them in threads"""
    accounts = [('user1', 'pass1'), ('user2', 'pass2'), ('userM', 'passM')]
    queries = ['query1', 'query2', 'query3', 'queryN']

    assignments = list(izip(cycle(accounts), queries))
    assignments = sorted(assignments, key=lambda (account, query): account)
    # [(('user1', 'pass1'), 'query1'), (('user1', 'pass1'), 'queryN'),
    #  (('user2', 'pass2'), 'query2'),
    #  (('userM', 'passM'), 'query3')]

    for account, assigned in groupby(assignments, lambda (account, query): account):
        queries = [item[1] for item in list(assigned)]
        Thread(target=run_query, args=(account, queries)).start()

if __name__ == '__main__':
    main()

Some notes:

  • I grouped the user and passwords in tuples; it seems to make sense and it's not hard to do even if they are separated in your code.
  • there are N queries and M users, with N > M. Note that cycling over the user accounts leads to query N (the last one) being assigned to user 1.
  • sorting the assignments is a requirement of the groupby function.
  • using itertools.groupby can be tricky. Note that the assigned part of the result is an iterator over the elements of assigments, so each element has again a account, query tuple. The important part is that this iterator will only return those elements for a single account; we extract the queries part and run that on a Thread.
  • incidentally, I find threading.Thread to be much simpler than threading.start_new_thread. No need to join the started thread.
  • no Queues, no mutexes, no nothing. Knowing your keys is everything :)
logc
  • 3,813
  • 1
  • 18
  • 29
  • Ideally, I would like to try to handle the number if users is more than the number of queries, but I think that is secondary to everything else. What you have suggested works really well, though I am trying to understand it all at the moment. I prefer it to the first response because when I come to manage the downloads, there may be hundreds of files and this method looks like it should handle them quite nicely. – emmalg Jun 26 '15 at 14:24
  • The code works exactly the same if the number of users is more than the number of queries (N < M): the `cycle` will never turn back, and each of the users that can receive a query will receive a query. – logc Jun 26 '15 at 14:31
  • Sorry for not getting back to you on Friday, but I have tested all the options now and like the flexibility of this approach best. It will extend really well to the download of the data as well. – emmalg Jun 29 '15 at 10:33
1

The easiest would be (with your current structure) to use the old thread module:

from thread import start_new_thread

start_new_thread(runQuery,(user1,pass1,query1,))
start_new_thread(runQuery,(user2,pass2,query2,))
start_new_thread(runQuery,(user3,pass3,query3,))

All these queries will run in parallel until their function runQuery returns. If you don't need feedback from the threads you don't need to take care of synchronisation.

As it now seems that you do need synchronisation, do it like so:

Define a Queue of queries that you add all queries to:

from Queue import Queue

queryQueue = Queue()
queryQueue.put(query1)
queryQueue.put(query2)
queryQueue.put(query3)

Now start your threads with a reference to the queryQueue:

start_new_thread(runQuery,(user1,pass1,queryQueue,))
start_new_thread(runQuery,(user2,pass2,queryQueue,))
start_new_thread(runQuery,(user3,pass3,queryQueue,))

In your run method do this in the beginning:

def runQuery(user, pass, queryQueue):
    query = queryQueue.get()

Queue is task safe, meaning it takes care of all necessary synchronisation for you.

Dakkaron
  • 5,930
  • 2
  • 36
  • 51
  • That sounds like a good starting point. I've edited the question slightly (see the "queries = line" ) as there are usually more queries than accounts so I need to loop over the queries. – emmalg Jun 25 '15 at 12:59
  • Meant to add - I could do that with a simple loop and counter but I can't reuse an account until the query has finished, so I do need some synchronisation in there – emmalg Jun 25 '15 at 13:01
  • I tried a spin on what you suggested, looping over the users. But it couldn't handle all the cases described: users > queries and users < queries. I since tried making a deque for the users which I could lock in the loop but I'm not seeing any output. `def runQuery(userQueue, ulock, queryQueue): query = queryQueue.get() with ulock: user = userQueue.popleft() userQueue.append(user) passw = userQueue.popleft() userQueue.append(passw) print user passw query + '\n' thread.start_new_thread(runQuery, (userQueue, ulock, queryQueue,))` Is this way off?! – emmalg Jun 26 '15 at 10:20
  • Sorry - clicked the wrong edit button!! :) I don't know how it even allowed me to edit your answer. Hope it doesn't get through peer review! – emmalg Jun 26 '15 at 10:29
  • Sorry, wasn't available over the weekend. Since Queue is thread safe you don't need to use a lock. From what I gather you can't use an user on two threads at the same time, is that correct? In that case, just append the user at the end of its runQuery(). Also, you shouldn't append()/pop() the user and password individually, but instead put them into a tuple and then append()/pop() the tuple. That way you can guarantee that an username and it's password stay together. – Dakkaron Jun 29 '15 at 08:28
  • Thank you for getting back to me, I wasn't available over the weekend either. I'm going to go with the answer from logc for simplicity as I like how the allocation is done, but your answer did help me a lot too. – emmalg Jun 29 '15 at 10:31