1

I want to get some basic statistics from some csv files without loading the whole file in memory. I do it in two ways, one seemingly "smart" way using pandas and another casual way using csv I expect the pandas way to be faster but the csv way is actually faster by a very large margin. I was wondering why.

Here is my code:

import pandas as pd
import csv

movies  = pd.read_csv('movies.csv')  # movieId,title,genres
movie_count  = movies.shape[0]       # 9742
movieId_min = ratings.movieId.min()
movieId_max = ratings.movieId.max()
movieId_disperse = movies.movieId.sort_values().to_dict()
movieId_squeeze = {v: k for k, v in movieId_disperse.items()}

def get_ratings_stats():
    gp_by_user  = []
    gp_by_movie = [0] * movie_count
    top_rator = (0, 0) # (idx, value)
    top_rated = (0, 0) # (idx, value)
    rating_count = 0
    user_count = 0
    last_user = -1
    for row in csv.DictReader(open('ratings.csv')):
        user = int(row['userId'])-1
        movie = movieId_squeeze[int(row['movieId'])]
        if last_user != user:
            last_user = user
            user_count += 1
            gp_by_user += [0]
        rating_count += 1
        gp_by_user[user]   += 1
        gp_by_movie[movie] += 1
        top_rator = (user,  gp_by_user[user])   if gp_by_user[user]   > top_rator[1] else top_rator
        top_rated = (movie, gp_by_movie[movie]) if gp_by_movie[movie] > top_rated[1] else top_rated
    top_rator = (top_rator[0]+1, top_rator[1])
    top_rated = (movieId_disperse[top_rated[0]], top_rated[1])
    return rating_count, top_rator, top_rated

Now if I replace the line:

for row in csv.DictReader(open('ratings.csv')):

With:

for chunk in pd.read_csv('ratings.csv', chunksize=1000):
    for _,row in chunk.iterrows():

The code actually becomes 10 times slower.

Here are the timing results:

> %timeit get_ratings_stats() # with csv
325 ms ± 9.98 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
> %timeit get_ratings_stats() # with pandas
3.45 s ± 67.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Any comments as to how I can make this code better/faster/more readable would be much appreciated

yukashima huksay
  • 5,834
  • 7
  • 45
  • 78
  • 1
    You put a loop inside of a loop that calls a method that returns a forward iterator. Why do you think you can compare the two samples as accurate representations of performance? – pstatix Dec 22 '18 at 08:15
  • First, using row-wise loops in Pandas is, in general, inefficient and not recommended. Second, Pandas creates a pretty expensive data structure while the DictReader utilizes a built-in `dict` type. They are simply meant for different purposes. – DYZ Dec 22 '18 at 08:18
  • @pstatix I was wondering if it was possible to get this job done using pandas in a faster way without loading the whole file at once. – yukashima huksay Dec 22 '18 at 10:40
  • @DYZ So when should I use pandas and when should I do things manually? – yukashima huksay Dec 22 '18 at 10:40
  • @yukashimahuksay It probably is, but not how you're doing it. – pstatix Dec 22 '18 at 22:17
  • @pstatix the way I'm doing it with pandas also loads the whole file at once??? – yukashima huksay Dec 23 '18 at 06:57
  • @yukashimahuksay Correct, but you are not understanding [time complexity](https://en.wikipedia.org/wiki/Time_complexity) of how you are reading through the file. Your `for chunk ... for _, row in ...` is O(n^2) in complexity, much slower then O(n). – pstatix Dec 23 '18 at 06:58
  • @pstatix would you be so kind as to explain to me how this is O(n^2) maybe you mean O(2*n) which means O(n) – yukashima huksay Dec 23 '18 at 07:02
  • @yukashimahuksay A `for` within a `for` that both have worst case complexity of O(n). Because the loops are nested, the result is O(n) * O(n) which is O(n^2). – pstatix Dec 23 '18 at 07:31
  • @yukashimahuksay It may behoove you to read [this](https://stackoverflow.com/questions/487258/what-is-a-plain-english-explanation-of-big-o-notation) post and [this](https://www.interviewcake.com/article/java/big-o-notation-time-and-space-complexity) and [this](https://rob-bell.net/2009/06/a-beginners-guide-to-big-o-notation/). Here is a [cheatsheet](http://bigocheatsheet.com/) for reference. – pstatix Dec 23 '18 at 07:36
  • @pstatix AFAIK the first for is run n/1000 times and the inner for is run 1000 times in each loop of the outer for. – yukashima huksay Dec 23 '18 at 07:38
  • @pstatix you could say that each run of the outer for takes 1000 iterations for creating the data frame but that is before running the loop so it's not multiplier but rather added to the time complexity of the inner for. – yukashima huksay Dec 23 '18 at 07:39
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/185678/discussion-between-pstatix-and-yukashima-huksay). – pstatix Dec 23 '18 at 07:39

2 Answers2

2

I think the point is that you shouldn't use pandas if you're going to then treat the big, expensive data structure like a dict. The question shouldn't be how to get pandas to be better at that, it should be how to write your code with pandas to do what you want.

import pandas as pd

def get_ratings_stats():
    movie_rating_data = pd.read_csv('ratings.csv')
    # Get the movie with the best rating
    top_movie = movie_rating_data.loc[:, ['movieId', 'rating']].groupby('movieId').agg('max').sort_values(by='rating', ascending=False).iloc[:, 0]
    # Get the user with the best rating
    top_user = movie_rating_data.loc[:, ['userId', 'rating']].groupby('userId').agg('max').sort_values(by='rating', ascending=False).iloc[:, 0]
    return movie_rating_data.shape[0], top_movie, top_user

def get_ratings_stats_slowly():
    movies = pd.DataFrame(columns = ["movieId", "ratings"])
    users = pd.DataFrame(users = ["userId", "ratings"])
    data_size = 0
    for chunk in pd.read_csv('ratings.csv', chunksize=1000):
        movies = movies.append(chunk.loc[:, ['movieId', 'rating']].groupby('movieId').agg('max'))
        users = users.append(chunk.loc[:, ['userId', 'rating']].groupby('userId').agg('max'))
        data_size += chunk.shape[0]
    top_movie = movies.loc[:, ['movieId', 'rating']].groupby('movieId').agg('max').sort_values(by='rating', ascending=False).iloc[:, 0]
    top_user = users.loc[:, ['userId', 'rating']].groupby('userId').agg('max').sort_values(by='rating', ascending=False).iloc[:, 0]
    return data_size, top_movie, top_user

I'm not really sure that this is what you want to do overall, but your code is incomprehensible - this should be a good place to start (you could replace .agg('max') with .count() if you're interested in the number of ratings, etc).

CJR
  • 3,916
  • 2
  • 10
  • 23
  • doesn't this load the whole file into memory? – yukashima huksay Dec 22 '18 at 16:11
  • Yes? Then it returns and the memory gets freed. Unless you're working with something that's in the GB+ range when you load it, this is the best way to do it. If you are working with something that large it's probably time to look at HDF5 and stop messing about with a csv (the difficulties in writing code for such large objects leads me to usually just get more memory instead, until I simply cannot). – CJR Dec 22 '18 at 16:25
  • I want to do It without loading the whole file into memory. I thought I had said it in the question but now I realized I haven't. sorry. – yukashima huksay Dec 22 '18 at 19:01
  • OK, I don't really understand why, but yeah - you probably shouldn't use pandas for this. You should consider looking into HDF5. I've edited the answer to include code for chunk-wise data loading with pandas, but I think it's bad code and I wouldn't recommend using it. – CJR Dec 22 '18 at 20:16
1

I think parallel processing is the answer for your question. I've tried doing some parallel processing on your problem but I had to split the ratings file into multiple files for processing.

What I did initially was to duplicate the ratings data from the CSV files by a factor of 10, and then I executed your script to have an initial execution time, which for me was about 3.6 seconds. Now, by splitting the files into multiple ones, that can be addressed by multiple child processes, and for example by using my script with -k 2 (basically 2 workers), the total execution time reduced to 1.87 seconds. If I use -k 4 (4 workers) the execution time will be 1.13 seconds.

I am not sure if it is possible to read the CSV in chunks and basically do a parallel seek reading from the CSV, from a single big file, but that would make it a lot faster, the only drawback being the need to do an initial count of the rows in the big CSV file, to know how many rows will go per worker.

The splitting script:

import csv

file_path = "data/ratings.csv"
out_path = "data/big_ratings_{}.csv"

out_csv = None

for i in range(10):
    print("Iteration #{}".format(i+1))
    pin = open(file_path, "r")
    pout = open(out_path.format(i), "w")
    in_csv = csv.DictReader(pin)
    out_csv = csv.DictWriter(pout, fieldnames=in_csv.fieldnames)
    out_csv.writeheader()

    for row in in_csv:
        out_csv.writerow(row)

    pin.close()
    pout.close()

The actual rating processing script

import time
import csv
import argparse
import os
import sys

from multiprocessing import Process, Queue, Value

import pandas as pd


top_rator_queue = Queue()
top_rated_queue = Queue()

DEFAULT_NO_OF_WORKERS = 1
RATINGS_FILE_PATH = "data/big_ratings_{}.csv"

NUMBER_OF_FILES = 10


class ProcessRatings(Process):

    def __init__(self, file_index_range, top_rator_queue, top_rated_queue, movie_id_squeeze):
        super(ProcessRatings, self).__init__()

        self.file_index_range = file_index_range
        self.top_rator_queue = top_rator_queue
        self.top_rated_queue = top_rated_queue
        self.movie_id_squeeze = movie_id_squeeze

    def run(self):

        for file_index in self.file_index_range:
            print("[PID: {}] Processing file index {} .".format(os.getpid(), file_index))

            start = time.time()

            gp_by_user  = []
            gp_by_movie = [0] * movie_count
            top_rator = (0, 0) # (idx, value)
            top_rated = (0, 0) # (idx, value)
            rating_count = 0
            user_count = 0
            last_user = -1

            for row in csv.DictReader(open(RATINGS_FILE_PATH.format(file_index))):
                user = int(row['userId'])-1
                movie = self.movie_id_squeeze[int(row['movieId'])]

                if last_user != user:
                    last_user = user
                    user_count += 1
                    gp_by_user += [0]

                gp_by_user[user]   += 1
                gp_by_movie[movie] += 1

                top_rator = (user,  gp_by_user[user])   if gp_by_user[user]   > top_rator[1] else top_rator
                top_rated = (movie, gp_by_movie[movie]) if gp_by_movie[movie] > top_rated[1] else top_rated

            end = time.time()
            print("[PID: {}] Processing time for file index {} : {}s!".format(os.getpid(), file_index, end-start))

        print("[PID: {}] WORKER DONE!".format(os.getpid()))


if __name__ == "__main__":
    print("Processing ratings in multiple worker processes.")

    start = time.time()

    # script arguments handling
    parser = argparse.ArgumentParser()
    parser.add_argument("-k", dest="workers", action="store")
    args_space = parser.parse_args()

    # determine the number of workers
    number_of_workers = DEFAULT_NO_OF_WORKERS
    if args_space.workers:
        number_of_workers = int(args_space.workers)
    else:
        print("Number of workers not specified. Assuming: {}".format(number_of_workers))

    # rating data
    rating_count = 0
    movies  = pd.read_csv('data/movies.csv')  # movieId,title,genres
    movie_count  = movies.shape[0]       # 9742
    movieId_min = movies.movieId.min()
    movieId_max = movies.movieId.max()
    movieId_disperse = movies.movieId.sort_values().to_dict()
    movieId_squeeze = {v: k for k, v in movieId_disperse.items()}

    # process data
    processes = []

    # initialize the worker processes
    number_of_files_per_worker = NUMBER_OF_FILES // number_of_workers
    for i in range(number_of_workers):
        p = ProcessRatings(
            range(i, i+number_of_files_per_worker),  # file index
            top_rator_queue,
            top_rated_queue,
            movieId_squeeze
        )
        p.start()
        processes.append(p)

    print("MAIN: Wait for processes to finish ...")
    # wait until all processes are done
    while True:
        # determine if the processes are still running
        if not any(p.is_alive() for p in processes):
            break

    # gather the data and do a final processing

    end = time.time()

    print("Processing time: {}s".format(end - start))

    print("Rating count: {}".format(rating_count))
andreihondrari
  • 5,743
  • 5
  • 30
  • 59
  • I'm pretty sure that you'll need to rewrite the internal parsing logic with this approach (because you can't be sure that the files are broken cleanly into groups based on some of the internal IDs which are assumed to be always present contiguously) – CJR Dec 22 '18 at 16:58
  • @CJ59 well not the parsing as much as the calculating of the partial top raters and rated movies per chunk, then you would need to do another run amongst the top raters and rated movies gathered from all the chunks... but I was too lazy to make it in this script. – andreihondrari Dec 22 '18 at 17:15
  • Yeah - you did a lot of writing. I think the point I want to make is that this is not a good solution - the overhead from multiprocessing is **much** larger then any gain you could even theoretically get for this problem (the three lines of pandas code in the answer above execute in ~ 50ms with way less memory used). Trying to parallelize input from a single file like this is generally a losing strategy, even for big files. – CJR Dec 22 '18 at 17:39
  • @CJ59 by overhead you mean development time? Idk, I'd prefer to split into parallel processes for the sake of speed. For example I had this issue with a very huge list of URL's that I needed to do requests on (and parsing of the response), and doing the requests in one thread versus 8 child processes made the whole execution speed up incredibly. I'd rather sacrifice memory for speed. And yet, you don't even have to sacrifice memory if you load/unload data as needed. – andreihondrari Dec 23 '18 at 01:10