I think parallel processing is the answer for your question. I've tried doing some parallel processing on your problem but I had to split the ratings file into multiple files for processing.
What I did initially was to duplicate the ratings data from the CSV files by a factor of 10, and then I executed your script to have an initial execution time, which for me was about 3.6 seconds
. Now, by splitting the files into multiple ones, that can be addressed by multiple child processes, and for example by using my script with -k 2
(basically 2 workers), the total execution time reduced to 1.87 seconds
. If I use -k 4
(4 workers) the execution time will be 1.13 seconds
.
I am not sure if it is possible to read the CSV in chunks and basically do a parallel seek reading from the CSV, from a single big file, but that would make it a lot faster, the only drawback being the need to do an initial count of the rows in the big CSV file, to know how many rows will go per worker.
The splitting script:
import csv
file_path = "data/ratings.csv"
out_path = "data/big_ratings_{}.csv"
out_csv = None
for i in range(10):
print("Iteration #{}".format(i+1))
pin = open(file_path, "r")
pout = open(out_path.format(i), "w")
in_csv = csv.DictReader(pin)
out_csv = csv.DictWriter(pout, fieldnames=in_csv.fieldnames)
out_csv.writeheader()
for row in in_csv:
out_csv.writerow(row)
pin.close()
pout.close()
The actual rating processing script
import time
import csv
import argparse
import os
import sys
from multiprocessing import Process, Queue, Value
import pandas as pd
top_rator_queue = Queue()
top_rated_queue = Queue()
DEFAULT_NO_OF_WORKERS = 1
RATINGS_FILE_PATH = "data/big_ratings_{}.csv"
NUMBER_OF_FILES = 10
class ProcessRatings(Process):
def __init__(self, file_index_range, top_rator_queue, top_rated_queue, movie_id_squeeze):
super(ProcessRatings, self).__init__()
self.file_index_range = file_index_range
self.top_rator_queue = top_rator_queue
self.top_rated_queue = top_rated_queue
self.movie_id_squeeze = movie_id_squeeze
def run(self):
for file_index in self.file_index_range:
print("[PID: {}] Processing file index {} .".format(os.getpid(), file_index))
start = time.time()
gp_by_user = []
gp_by_movie = [0] * movie_count
top_rator = (0, 0) # (idx, value)
top_rated = (0, 0) # (idx, value)
rating_count = 0
user_count = 0
last_user = -1
for row in csv.DictReader(open(RATINGS_FILE_PATH.format(file_index))):
user = int(row['userId'])-1
movie = self.movie_id_squeeze[int(row['movieId'])]
if last_user != user:
last_user = user
user_count += 1
gp_by_user += [0]
gp_by_user[user] += 1
gp_by_movie[movie] += 1
top_rator = (user, gp_by_user[user]) if gp_by_user[user] > top_rator[1] else top_rator
top_rated = (movie, gp_by_movie[movie]) if gp_by_movie[movie] > top_rated[1] else top_rated
end = time.time()
print("[PID: {}] Processing time for file index {} : {}s!".format(os.getpid(), file_index, end-start))
print("[PID: {}] WORKER DONE!".format(os.getpid()))
if __name__ == "__main__":
print("Processing ratings in multiple worker processes.")
start = time.time()
# script arguments handling
parser = argparse.ArgumentParser()
parser.add_argument("-k", dest="workers", action="store")
args_space = parser.parse_args()
# determine the number of workers
number_of_workers = DEFAULT_NO_OF_WORKERS
if args_space.workers:
number_of_workers = int(args_space.workers)
else:
print("Number of workers not specified. Assuming: {}".format(number_of_workers))
# rating data
rating_count = 0
movies = pd.read_csv('data/movies.csv') # movieId,title,genres
movie_count = movies.shape[0] # 9742
movieId_min = movies.movieId.min()
movieId_max = movies.movieId.max()
movieId_disperse = movies.movieId.sort_values().to_dict()
movieId_squeeze = {v: k for k, v in movieId_disperse.items()}
# process data
processes = []
# initialize the worker processes
number_of_files_per_worker = NUMBER_OF_FILES // number_of_workers
for i in range(number_of_workers):
p = ProcessRatings(
range(i, i+number_of_files_per_worker), # file index
top_rator_queue,
top_rated_queue,
movieId_squeeze
)
p.start()
processes.append(p)
print("MAIN: Wait for processes to finish ...")
# wait until all processes are done
while True:
# determine if the processes are still running
if not any(p.is_alive() for p in processes):
break
# gather the data and do a final processing
end = time.time()
print("Processing time: {}s".format(end - start))
print("Rating count: {}".format(rating_count))