1

I'm working on a programme which processes a huge json file and does some analysis before inserting into db In the beginning, my prototype of the programme is divided the json file into n parts. Then they runs independently by script:

python data_import.py --start 1 --cluster 6
python data_import.py --start 2 --cluster 6
python data_import.py --start 3 --cluster 6...

The performance is pretty good but it's so annoying to create so many tabs when I have to run this. Therefore, I modified the programme with multi-processing like this.

def main():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--cluster', type=int, default=5,
                        help='number of clusters')
    parser.add_argument('--total', type=int, default=5701017,
                        help='total number of data')
    parser.add_argument('--json_path', type=str, default="../Data/output.json",
                        help='location of data source')

    args = parser.parse_args()

    manager = Manager()
    work_count = manager.list()
    for i in range(0, args.cluster):
        work_count.append(0)

    p_logger = setup_logger("Workers", 'done' + '_' + str(args.cluster) + '.log',
                            logging.INFO)
    try:
         processes = []
         for c in range(1, args.cluster + 1):
             p = Process(target=update_extractor_result, args=(args, c, work_count, p_logger))
             processes.append(p)
         # Start the processes
         for p in processes:
             p.start()
         # Ensure all processes have finished execution
         for p in processes:
             p.join()          
    except Exception as e:
        print("Error: unable to start thread")


def update_extractor_result(args, num_start, work_count, p_logger):
    logger = setup_logger(__name__, 'error' + str(num_start) + '_' + str(args.cluster) + '.log', logging.ERROR)

    batch = 1

    total_loaded_count = 0
    total = args.total - 1
    total_works = int(total / args.cluster)
    done_count = 0
    startfrom = int(total_works * (num_start - 1))
    endfrom = int(total_works * (num_start))

    json_path = args.json_path

    with open(json_path, 'r', encoding="utf8") as f:
        for line in f:
            try:
                data = json.loads(line)

                if total_loaded_count % 100 == 0:
                    p_logger.info("Workers: " + str(work_count))
                total_loaded_count += 1
                work_count[num_start - 1] += 1

                if total_loaded_count >= startfrom and total_loaded_count <= endfrom:
                    data = data.doAnalysis()
                    insertToDB()
                    print("Done batch " + str(batch) + " - count: " + str(done_count))
                    batch += 1

Comparing to running same programme in many tabs at the same time with multi-processing, if there is 6 clusters, the previous needs 6-8 hours but the other one finishes 1/5 process in 12 hours.

Why are they so much differences? Or my programme has some problems?

Louis Luk
  • 303
  • 2
  • 16
  • I don't know about error in ypur program. But there are alot many differences in multiprocessing and Threading. Threading runs on a single process, Multiprocessing works on multiple process and GIL interleaving is also avoided in that case.. To make it more clear you can refer https://stackoverflow.com/questions/2846653/how-to-use-threading-in-python – Developer Mar 25 '18 at 10:19

0 Answers0