1

I am trying to parse a large space separated file (3 GB and higher) into a sqlite database for other processing. The file currently has around 20+ million lines of data. I have tried multithreading this, but for some reason, it stops with around 1500 lines and does not proceed. I don’t know if I am doing anything wrong. Can someone please point me in the right direction?

The insertion is working fine with one process, but is is way too slow (of course!!!). It has been running for over seven hours and it is not even past the first set of strings. The DB file is still 25 MB in size and not even close to the number of records it has to contain.

Please guide me towards speeding this up. I have one more huge file to go (more than 5 GB) and this could take days.

Here’s my code:

1 import time
2 import queue
3 import threading
4 import sys
5 import sqlite3 as sql
6
7 record_count = 0
8 DB_INSERT_LOCK = threading.Lock()
9
10 def process_data(in_queue):
11     global record_count
12     try:
13         mp_db_connection = sql.connect("sequences_test.sqlite")
14         sql_handler = mp_db_connection.cursor()
15     except sql.Error as error:
16         print("Error while creating database connection: ", error.args[0])
17     while True:
18         line = in_queue.get()
19         # print(line)
20         if (line[0] == '@'):
21             pass
22         else:
23             (sequence_id, field1, sequence_type, sequence_count, field2, field3,
24               field4, field5, field6, sequence_info, kmer_length, field7, field8,
25               field9, field10, field11, field12, field13, field14, field15) =     
                                                                   line.expandtabs(1).split(" ")
26
27             info = (field7 + " " + field8 + " " + field9 + " " + field10 + " " +
28                      field11 + " " + field12 + " " + field13 + " " + field14 + " "
29                      + field15)
30
31             insert_tuple = (None, sequence_id, field1, sequence_type, sequence_count,
32                              field2, field3, field4, field5, field6, sequence_info,
33                               kmer_length, info)
34             try:
35                 with DB_INSERT_LOCK:
36                     sql_string = 'insert into sequence_info \
37                                    values (?,?,?,?,?,?,?,?,?,?,?,?,?)'
38                     sql_handler.execute(sql_string, insert_tuple)
39                     record_count = record_count + 1
40                     mp_db_connection.commit()
41             except sql.Error as error:
42                 print("Error while inserting service into database: ", error.args[0])
43             in_queue.task_done()
44
45 if __name__ == "__main__":
46     try:
47         print("Trying to open database connection")
48         mp_db_connection = sql.connect("sequences_test.sqlite")
49         sql_handler = mp_db_connection.cursor()
50         sql_string = '''SELECT name FROM sqlite_master \
51                          WHERE type='table' AND name='sequence_info' '''
52         sql_handler.execute(sql_string)
53         result = sql_handler.fetchone()
54         if(not result):
55             print("Creating table")
56             sql_handler.execute('''create table sequence_info
57                                 (row_id integer primary key, sequence_id real, field1 
58                                    integer, sequence_type text, sequence_count real,
59                                  field2 integer, field3 text,
60                                  field4 text, field5 integer, field6 integer,
61                                  sequence_info text, kmer_length text, info text)''')
62             mp_db_connection.commit()
63         else:
64             pass
65         mp_db_connection.close()
66     except sql.Error as error:
67         print("An error has occured.: ", error.args[0])
68
69     thread_count = 4
70     work = queue.Queue()
71
72     for i in range(thread_count):
73         thread = threading.Thread(target=process_data, args=(work,))
74         thread.daemon = True
75         thread.start()
76
77     with open("out.txt", mode='r') as inFile:
78         for line in inFile:
79             work.put(line)
80
81     work.join()
82
83     print("Final Record Count: ", record_count)

The reason I have a lock is that with sqlite, I don’t currently have a way to batch commit my files into the DB and hence I have to make sure that every time a thread inserts a record, state of the DB is committed.

I know I am losing some processing time with the expandtabs call in the thick of things, but it is a little difficult to post process the file I am receiving to do a simple split on it. I will continue trying to do that so that the workload is reduced, but I need the multithreading at least to work.

EDIT:

I moved the expandtabs and split part outside the processing. So I process the line and insert in into the queue as a tuple so that the threads can pick it up and directly insert it into the DB. I was hoping to save quite a bit of time with this, but now I am running into problems with sqlite. It says could not insert into db because it is locked. I am thinking it is more of a thread sync issue with the locking part since I have an exclusive lock on the critical section below. Could someone please elaborate on how to resolve this?

Corley Brigman
  • 11,633
  • 5
  • 33
  • 40
adwaraki
  • 342
  • 1
  • 5
  • 14

3 Answers3

2

I wouldn't expect multithreading to be of much use there. You should maybe write a generator function that processes the file into tuples, which you then insert with executemany

Brave Sir Robin
  • 1,046
  • 6
  • 9
  • Is writing a generator for such a huge file a good idea? It is 3 GB in size. Memory issues etc? – adwaraki Feb 11 '14 at 16:27
  • That's the exact reason why you use a generator and not a list. – Brave Sir Robin Feb 11 '14 at 16:30
  • Actually wonderful idea. I wasn’t aware of this. In any case, the speed is up. I can see the DB size increasing as it inserts. I wanted to check if there are records, so I stopped the script. And the DB size went to zero. At what point does a commit happen? Or when do I need to do a commit? Thanks for the help. – adwaraki Feb 11 '14 at 17:25
  • 3
    And if you find it necessary to fine-tune something after that, then there's following question: ["How do I improve the performance of SQLite?"](http://stackoverflow.com/q/1711631/2419207) – iljau Feb 12 '14 at 03:34
0

multihtreading will not help you

The first thing you have to do is not commit each record according to http://sqlite.org/speed.html it's a factor 250 in speed.

To not lose all your work if you interrupt just commit every 10000 or 100000 records

Xavier Combelle
  • 10,968
  • 5
  • 28
  • 52
0

In addition to previous responses, try:

pbacterio
  • 1,094
  • 6
  • 12