0

I have a file (input.txt) containing half-a-million lines, and I want to encrypt these lines with my encrypt function, and save them to one single file called output.txt. For example the input.txt is

aab
abb
abc

Then I want to have my output.txt to be

001
011
012

Simple for loop version

I have a working for loop, however it takes nearly 9 hours to encrypt all the lines:

encryption_map = {}
encryption_map['a']=0
encryption_map['b']=1
encryption_map['c']=2

def encrypt(input_str):
    output_int = ''
    for i in input_str: 
        for ch in i.split('\n')[0]: # remove line break symbol \n 
            output_int += str(encryption_map[ch])
    return output_int

text_path = 'input.txt'
with open(text_path, 'r') as input_file:
    lines = input_file.readlines()
    with open('output.txt', 'w') as output_file:
        for l in lines:
            output_int = encrypt(l)
            output_file.write(output_int + '\n')    

apply_async version

Since I want to keep the same ordering, in the output.txt, it seems I have to use apply_async. Then my code becomes:

import multiprocessing as mp

encryption_map = {}
encryption_map['a']=0
encryption_map['b']=1
encryption_map['c']=2

def encrypt(input_str):
    output_int = ''
    for i in input_str: 
        for ch in i.split('\n')[0]: # remove line break symbol \n 
            output_int += str(encryption_map[ch])
    return output_int

def write_result(output):
    output_file.write(ipa_output + '\n')
    # output_file.flush() # This line is suggested by another stack question

pool = mp.Pool(20)

text_path = 'input.txt'
with open(text_path, 'r') as input_file:
    lines = input_file.readlines()
    with open('output.txt', 'w') as output_file:
        for l in lines:
            pool.apply_async(encrypt, args=l, callback=write_result)
pool.close()
pool.join()

It runs much faster, however, the output.txt is always empty. What's wrong with my code? I found one post that also has difficulty in writing out the file, and they suggest us to put f.flush() inside the write function, but it also doesn't work.

Raven Cheuk
  • 2,903
  • 4
  • 27
  • 54

1 Answers1

1

You need to write args=(line,) like this:

import multiprocessing as mp


encryption_map = {}
encryption_map['a'] = 0
encryption_map['b'] = 1
encryption_map['c'] = 2


output_file = open('output.txt', 'w')


def encrypt(input_str):
    output_int = ''
    for i in input_str:
        for ch in i.split('\n')[0]:
            output_int += str(encryption_map[ch])
    return output_int


def write_result(output):
    output_file.write(output + '\n')


def main():
    #mp.set_start_method('spawn')  # Only needed on OSX
    pool = mp.Pool(2)
    with open('input.txt') as input_file:
        lines = input_file.readlines()
        for line in lines:
            pool.apply_async(encrypt, args=(line,), callback=write_result)
    pool.close()
    pool.join()
    output_file.close()


if __name__ == '__main__':
    main()

EDIT:

In the above code, since we are using apply_async, the order of lines in the output might not be the same as that of the input.
If we want to preserve order, then we can either use map/map_async/imap.
In this case, imap might be the best option since the callback operation (IO bound) is much slower than the worker operation (CPU bound):

import multiprocessing as mp


encryption_map = {}
encryption_map['a'] = 0
encryption_map['b'] = 1
encryption_map['c'] = 2


output_file = open('output.txt', 'w')


def encrypt(input_str):
    output_int = ''
    for i in input_str:
        for ch in i.split('\n')[0]:
            output_int += str(encryption_map[ch])
    return output_int


def main():
    mp.set_start_method('spawn')  # Only needed on OSX
    pool = mp.Pool(2)
    with open('input.txt') as input_file:
        lines = input_file.readlines()
        for output in pool.imap(encrypt, lines):
            output_file.write(output + '\n')
    pool.close()
    pool.join()


if __name__ == '__main__':
    main()
Anmol Singh Jaggi
  • 8,376
  • 4
  • 36
  • 77
  • It works, but the ordering of the output file is messed up... Other post says that I should add `r.wait()` after `r=pool.apply_async`. But adding `r.wait()` makes it as slow as the plain `for` loop without multiprocessing... – Raven Cheuk Apr 20 '21 at 08:10