I often need to process several hundred million rows of a MySQL table on a line by line basis using Python. I want a script that is robust and does not need to be monitored.
Below I pasted a script that classifying the language of the message field in my row. It utilizes the sqlalchemy and MySQLdb.cursors.SSCursor modules. Unfortunately this script consistently throws a 'Lost connection to MySQL server during query' error after 4840 rows when I run remotely and 42000 rows when I run locally.
Also, I have checked and max_allowed_packet = 32M on my MySQL server's /etc/mysql/my.cnf file as per the answers to this stackoverflow question Lost connection to MySQL server during query
Any advice for either fixing this error, or using another approach to use Python for processing very large MySQL files in a robust way would be much appreciated!
import sqlalchemy
import MySQLdb.cursors
import langid
schema = "twitterstuff"
table = "messages_en" #900M row table
engine_url = "mysql://myserver/{}?charset=utf8mb4&read_default_file=~/.my.cnf".format(schema)
db_eng = sqlalchemy.create_engine(engine_url, connect_args={'cursorclass': MySQLdb.cursors.SSCursor} )
langid.set_languages(['fr', 'de'])
print "Executing input query..."
data_iter = db_eng.execute("SELECT message_id, message FROM {} WHERE langid_lang IS NULL LIMIT 10000".format(table))
def process(inp_iter):
for item in inp_iter:
item = dict(item)
(item['langid_lang'], item['langid_conf']) = langid.classify(item['message'])
yield item
def update_table(update_iter):
count = 0;
for item in update_iter:
count += 1;
if count%10 == 0:
print "{} rows processed".format(count)
lang = item['langid_lang']
conf = item['langid_conf']
message_id = item['message_id']
db_eng.execute("UPDATE {} SET langid_lang = '{}', langid_conf = {} WHERE message_id = {}".format(table, lang, conf, message_id))
data_iter_upd = process(data_iter)
print "Begin processing..."
update_table(data_iter_upd)