Python concurrent.futures modules allows to types parallel processing
- Multi-threading (for I/O bound tasks)
- Multi-processes (for CPU bound taks)
Results of valuating both for speedup of your task using 10K files
- Non-Parallel and Multi-threaded about the same time
- Multi-processes version about 2X faster
Code
Note: Placed multiprocessing code in separate file due to issues with Windows Jupyter notebook. This is not necessary for other environments.
File: multi_process_hexify.py (all the processing code)
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from time import time
import binascii
def all_files(directory):
' Generator for list of files starting with directory '
for r, d, f in os.walk(directory):
for name in f:
yield os.path.join(r, name)
def create_test_files(folder_path, number_files, size):
' Create files with random binary data '
# Create files folder (if doesn't exist)
Path(folder_path).mkdir(parents=True, exist_ok=True)
# Create data in folder
for i in range(number_files):
data = os.urandom(size)
with open(os.path.join(folder_path, f'{i}.txt'), 'wb') as f:
f.write(data)
def binary_file_reader(file_path):
with open(file_path, "r+b") as binary_file_data:
binary_file_data = binary_file_data.read()
binary_data = binascii.hexlify(binary_file_data)
binary_data = binary_data.decode("utf-8")
return binary_data
def process_file(file_path):
try:
binary_data = binary_file_reader(file_path)
return f"Successful: {file_path}"
except IOError:
return f"Unsuccessful: {file_path}"
def get_final(responses):
' Creates the final result string to return to user '
responses = list(responses)
successful = sum(1 for x in responses if x[0]=='S') # Count successful
unsuccessful = len(responses) - successful # Count unsuccessful
return responses, successful, unsuccessful
def main_non_parallel(device_directory):
' Unthreaded processing using process_file '
start = time()
responses = [process_file(file_path) for file_path in all_files(device_directory)]
result = get_final(responses)
end = time() - start
print(f"Processed main_unthreaded in {end:.4f} sec")
return result
def main_multithreaded(device_directory):
# https://stackoverflow.com/questions/42074501/python-concurrent-futures-processpoolexecutor-performance-of-submit-vs-map/42096963#42096963
' Multithreaded processing using process_file '
start = time()
with ThreadPoolExecutor() as executor:
futures = executor.map(process_file, all_files(device_directory), chunksize = 1000)
result = get_final(futures)
end = time() - start
print(f"Processed main_multithreaded in {end:.4f} sec")
return result
def main_multiprocessing(device_directory):
' Multi processing using process_file '
start = time()
files = list(all_files(device_directory))
with ProcessPoolExecutor() as executor:
futures = executor.map(process_file, files, chunksize = 1000)
result = get_final(futures)
end = time() - start
print(f"Processed main_multiprocessing in {end:.4f} sec")
return result
Test
File: main.py
import os
import multi_process_hexify
if __name__ == '__main__':
# Directory for files
device_directory = os.path.join(os.getcwd(), 'test_dir')
# Create Data
multi_process_hexify.create_test_files(device_directory, 100, 100)
# Perform Non-Parallel Processing
read_file_names_unthreaded, successful, unsucessful = multi_process_hexify.main_non_parallel(device_directory)
print(f'Successful {successful}, Unsuccessfuil {unsucessful}')
print()
# Perform Multi Threaded Processing
read_file_names_threaded, successful, unsucessful = multi_process_hexify.main_multithreaded(device_directory)
print(f'Successful {successful}, Unsuccessfuil {unsucessful}')
print()
# Perform Multi Processes Processing
read_file_names_multiprocessing, successful, unsucessful = multi_process_hexify.main_multiprocessing(device_directory)
print(f'Successful {successful}, Unsuccessfuil {unsucessful}')
# Confirm all three methods produce the same result
print(read_file_names_unthreaded == read_file_names_threaded == read_file_names_multiprocessing)
Output
Processed main_unthreaded in 2.6610 sec
Successful 10000, Unsuccessfuil 0
Processed main_multithreaded in 3.2250 sec
Successful 10000, Unsuccessfuil 0
Processed main_multiprocessing in 1.2241 sec
Successful 10000, Unsuccessfuil 0
True