The task is to take records from the input file, process them and store in SQLite database. File can have millions of records, processing of one record is pretty fast, but I hope to gain some boost from multiprocessing. I implemented it and found that there is somewhere a bottleneck since the boost is not so big.
I cannot effectively utilize all cores. 3 processes give some noticeable effect, more processes already is not effective.
Below I provided the simplified example code just to show how processes are created and managed.
After some investigation I suspect:
data reading from the disk
serialization/unserialization - the least suspicious
data passing to processes
locks. I have two of them:
- to write data to db and
- to manage intermediate data in-memory which will be stored to db after the whole process was finished
What is not the bottleneck:
- db writing
- I made it by chunks and
- replacement with in-memory database gave no speed gain
I profiled the code in single processes (with cProfile
). It is not very useful. The most time is spent on calculation stage.
Measuring the time execution on small subset of data gave:
# (laptop, 2 cores with hyper-threading, Python 3.5, Ubuntu 16.04, SSD)
serial (old implementation): 28s
parallel (workers = 1): 28s
parallel (workers = 2): 19s
parallel (workers = 3): 17s
parallel (workers = 4): 17s
# (virtual machine on a server, 30 cores, Python 3.4, Ubuntu 14.04, HDD)
parallel (workers = 1): 28s
parallel (workers = 2): 11s
parallel (workers = 3): 10s
parallel (workers = 4): 8s
parallel (workers = 5): 8s
parallel (workers = 6): 8s
Q: How to determine what is a bottleneck or at least some of the suspected issues? Is it possible to obtain a better than 4-fold gain?
# indigo is an external module
def process(q, conn, cursor, d, lock_db, lock_dict):
data_collector = []
while True:
data = q.get()
if data is None:
break
mol_name = data[1]
mol = indigo.unserialize(data[0]) # <-- unserialization
lock_dict.acquire()
value = d.get(mol_name, None)
if value is None:
value = calc_value(mol)
d[name] = value
lock_dict.release()
# some calculations which return several variables A, B and C
data_collector.append([mol_name, A, B, C])
if len(data_collector) == 1000:
insert_data(conn, cursor, data_collector, lock_db)
data_collector = []
insert_data(conn, cursor, data_collector, lock_db)
with lite.connect(out_fname) as conn:
cur = conn.cursor()
create_tables(cur)
nprocess = max(min(ncpu, cpu_count()), 1)
manager = Manager()
lock_db = manager.Lock()
lock_dict = manager.Lock()
q = manager.Queue(2 * nprocess)
d = manager.dict()
pool = []
for i in range(nprocess):
p = Process(target=process, args=(q, conn, cur, d, lock_db, lock_dict))
p.start()
pool.append(p)
for i, mol in enumerate(indigo.iterateSDFile(file_name)):
q.put((mol.serialize(), mol.name()) # <-- serialization
for _ in range(nprocess):
q.put(None)
for p in pool:
p.join()
for k, v in d.items():
insert_db(cur, k, v)
conn.commit()