I'm working on a programme which processes a huge json file and does some analysis before inserting into db In the beginning, my prototype of the programme is divided the json file into n parts. Then they runs independently by script:
python data_import.py --start 1 --cluster 6
python data_import.py --start 2 --cluster 6
python data_import.py --start 3 --cluster 6...
The performance is pretty good but it's so annoying to create so many tabs when I have to run this. Therefore, I modified the programme with multi-processing like this.
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--cluster', type=int, default=5,
help='number of clusters')
parser.add_argument('--total', type=int, default=5701017,
help='total number of data')
parser.add_argument('--json_path', type=str, default="../Data/output.json",
help='location of data source')
args = parser.parse_args()
manager = Manager()
work_count = manager.list()
for i in range(0, args.cluster):
work_count.append(0)
p_logger = setup_logger("Workers", 'done' + '_' + str(args.cluster) + '.log',
logging.INFO)
try:
processes = []
for c in range(1, args.cluster + 1):
p = Process(target=update_extractor_result, args=(args, c, work_count, p_logger))
processes.append(p)
# Start the processes
for p in processes:
p.start()
# Ensure all processes have finished execution
for p in processes:
p.join()
except Exception as e:
print("Error: unable to start thread")
def update_extractor_result(args, num_start, work_count, p_logger):
logger = setup_logger(__name__, 'error' + str(num_start) + '_' + str(args.cluster) + '.log', logging.ERROR)
batch = 1
total_loaded_count = 0
total = args.total - 1
total_works = int(total / args.cluster)
done_count = 0
startfrom = int(total_works * (num_start - 1))
endfrom = int(total_works * (num_start))
json_path = args.json_path
with open(json_path, 'r', encoding="utf8") as f:
for line in f:
try:
data = json.loads(line)
if total_loaded_count % 100 == 0:
p_logger.info("Workers: " + str(work_count))
total_loaded_count += 1
work_count[num_start - 1] += 1
if total_loaded_count >= startfrom and total_loaded_count <= endfrom:
data = data.doAnalysis()
insertToDB()
print("Done batch " + str(batch) + " - count: " + str(done_count))
batch += 1
Comparing to running same programme in many tabs at the same time with multi-processing, if there is 6 clusters, the previous needs 6-8 hours but the other one finishes 1/5 process in 12 hours.
Why are they so much differences? Or my programme has some problems?