0

I have a table called "unprocessed" where I want to read 2000 rows, send them over HTTP to another server and then insert the rows into a "processed" table and remove them from the "unprocessed" table.

My python code roughly looks like this:

db = MySQLdb.connect("localhost","username","password","database" )

# prepare a cursor object using cursor() method
cursor = db.cursor()

# Select all the records not yet sent
sql = "SELECT * from unprocessed where SupplierIDToUse = 'supplier1' limit 0, 2000"
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
  id = row[0]
  <code is here here for sending to other server - it takes about 1/2 a second>
  if sentcorrectly="1":
     sql = "INSERT into processed (id, dateprocessed) VALUES ('%s', NOW()')" % (id)
     try:
        inserted = cursor.execute(sql)
     except:
        print "Failed to insert"
     if inserted:
        print "Inserted"
        sql = "DELETE from unprocessed where id = '%s'" % (id)
        try:
            deleted = cursor.execute(sql)
        except:
            print "Failed to delete id from the unprocessed table, even though it was saved in the processed table."
db.close()
sys.exit(0)

I want to be able to run this code concurrently so that I can increase the speed of sending these records to the other server over HTTP. At the moment if I try and run the code concurrently I get multiple copies of the same data sent top the other server and saved into the the "processed" table as the select query is getting the same id's in multiple instances of the code.

How can I lock the records when I select them and then process each record as a row before moving them to the "processed" table? The table was MyISAM but I've converted to innoDB today as I realise there's probably a way of locking the records better with innoDB.

3G Telecoms
  • 303
  • 1
  • 10
  • 1
    possible duplicate of [How to deliberately lock a MySQL row such that even SELECT will return an error?](http://stackoverflow.com/questions/2051225/how-to-deliberately-lock-a-mysql-row-such-that-even-select-will-return-an-error) – David Apr 16 '13 at 16:42
  • Only non-duplicate part of this would be if connection A needs to disconnect from the start of work to when it writes. – David Apr 16 '13 at 16:43
  • I looked at the possible duplicate question and this doesn't have any python code and also doesn't explain how I would select the id's from the table and then process each one before then removing them from the table. – 3G Telecoms Apr 16 '13 at 16:44

1 Answers1

1

Based off your comment reply.

One of two solutions would be a client side python master process to collect the record ID's for all 2000 records and then split that up into chunks to be processed by sub workers.

Short version, your choices are delegate the work or rely on a possibly tricky asset locking mechanism. I would recommend the former approach as it can scale up with the aid of a message queue.

delegate logic would use multiprocessing

import multiprocessing
records = get_all_unprocessed_ids()
pool = multiprocessing.Pool(5) #create 5 workers
pool.map(process_records, records) 

That would create 2000 tasks and run 5 tasks at a time or you can split records into chunks, using a solution outlined here How do you split a list into evenly sized chunks?

pool.map(process_records, chunks(records, 100)) 

would create 20 lists of 100 records that would be processed in batches of 5

Edit: syntax error - signature is map(func, iterable[, chunksize]) and I left out the argument for func.

Community
  • 1
  • 1
David
  • 17,673
  • 10
  • 68
  • 97
  • This looks great! I tried to implement it but I think I over simplified the example. My function to get the ids actually returns number of rows. When I pass each record using `process.map(process_records, record)` I get an error: `TypeError: 'datetime.datetime' object is unsubscriptable` One of the fields of the record row is a dateTime object. How would I fix this? – 3G Telecoms Apr 17 '13 at 11:58
  • #1 You want to pass a list or iterable to process.map of more then just one unit of work. #2 unsubscriptable means that something is trying to do `my_var[0]` when my_var isn't a list like object. http://stackoverflow.com/questions/4123603/python-unsubscriptable – David Apr 17 '13 at 14:15
  • Thanks David - this is now working as per your multiprocessing suggestion.# – 3G Telecoms Apr 29 '13 at 13:05
  • @3GTelecoms Good to hear the distributed-lite approach is working, – David Apr 29 '13 at 14:52