I just noticed that my program is using more and more memory as it processes a large file. It's only processing one line at a time, though, so I couldn't figure out why it would keep using more memory.
After a lot of digging, I realised that the program has three parts:
- Load the data, one line at a time.
- Process each line in a
multiprocessing.Pool
usingimap_unordered()
. - Process each line in a single thread.
If steps 1 and 2 are faster than step 3, then the results from the pool workers will queue up, consuming memory.
How can I throttle the data that I feed into the pool for step 2, so it doesn't get ahead of the consumer in step 3?
This looks similar to another multiprocessing question, but it's not clear to me where the delay is in that question.
Here's a small example that demonstrates the problem:
import logging
import os
import multiprocessing
from time import sleep
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(process)d:%(thread)d:%(message)s')
logger = logging.getLogger()
def process_step1():
data = 'a' * 100000
for i in xrange(10000):
sleep(.001) # Faster than step 3.
yield data
if i % 1000 == 0:
logger.info('Producing %d.', i)
logger.info('Finished producing.')
def process_step2(data):
return data.upper()
def process_step3(up_data):
assert up_data == 'A' * 100000
sleep(.005) # Slower than step 1.
def main():
pool = multiprocessing.Pool(processes=10)
logger.info('Starting.')
loader = process_step1()
processed = pool.imap_unordered(process_step2, loader)
for i, up_data in enumerate(processed):
process_step3(up_data)
if i % 500 == 0:
logger.info('Consuming %d, using %0.1f MB.', i, get_memory())
logger.info('Done.')
def get_memory():
""" Look up the memory usage, return in MB. """
proc_file = '/proc/{}/status'.format(os.getpid())
scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0}
with open(proc_file, 'rU') as f:
for line in f:
if 'VmSize:' in line:
fields = line.split()
size = int(fields[1])
scale = fields[2].upper()
return size*scales[scale]/scales['MB']
return 0.0 # Unknown
main()
When that runs, I see a steady increase in memory use until step 1 finishes. If I let it run long enough after that, the memory use will start to decrease.
2016-12-01 15:37:50,859:6414:139712380557056:Starting.
2016-12-01 15:37:50,861:6414:139712266237696:Producing 0.
2016-12-01 15:37:50,868:6414:139712380557056:Consuming 0, using 255.0 MB.
2016-12-01 15:37:52,054:6414:139712266237696:Producing 1000.
2016-12-01 15:37:53,244:6414:139712266237696:Producing 2000.
2016-12-01 15:37:53,421:6414:139712380557056:Consuming 500, using 383.0 MB.
2016-12-01 15:37:54,446:6414:139712266237696:Producing 3000.
2016-12-01 15:37:55,635:6414:139712266237696:Producing 4000.
2016-12-01 15:37:55,976:6414:139712380557056:Consuming 1000, using 511.2 MB.
2016-12-01 15:37:56,831:6414:139712266237696:Producing 5000.
2016-12-01 15:37:58,019:6414:139712266237696:Producing 6000.
2016-12-01 15:37:58,529:6414:139712380557056:Consuming 1500, using 703.2 MB.
2016-12-01 15:37:59,209:6414:139712266237696:Producing 7000.
2016-12-01 15:38:00,406:6414:139712266237696:Producing 8000.
2016-12-01 15:38:01,084:6414:139712380557056:Consuming 2000, using 831.5 MB.
2016-12-01 15:38:01,602:6414:139712266237696:Producing 9000.
2016-12-01 15:38:02,802:6414:139712266237696:Finished producing.
2016-12-01 15:38:03,640:6414:139712380557056:Consuming 2500, using 959.5 MB.
2016-12-01 15:38:06,199:6414:139712380557056:Consuming 3000, using 959.5 MB.