1

I have a huge text file for which I want to create a dictionary (Counter). Currently, I am doing it using the following code:

with open(file_name) as input_doc:
for line in input_doc:
    for word in line.strip().split():
        vocab[word] += 1

but, since the file is huge, it takes a lot of time. So, I am looking for a faster way of doing this.

The most straight forward solution that comes into mind is storing a bunch of lines in a list (small batches) and process each batch separately (in parallel with the other batches), and at the end, merging the results. This way, we can save a lot of time and can process the previously seen batches (in parallel) while the main thread is reading next batch of lines from file.

something like:

buffer_size = 1000
buff = []
vocab = Counter()
number_of_sentences = 1
with open(file_name) as input_doc:
    for line in input_doc:
        if number_of_sentences % buffer_size == 0:
            vocab += update_dictionary(buff) ### Here I should create and call a new thread to work on the new batch
            buff = []
        else
            buff.append(line)
            number_of_sentences += 1

Here, the update_dictionary() method reads all the sentences in the given list and updates its local dictionary. Once it is done, its local dictionary should be merged with the global one. I tried for a couple of hours, but unfortunately since I never implemented a multi-threaded code in Python, I couldn't manage to make it work. Could you please help me to implement this idea?

Thank you very much.

Hakim
  • 11,110
  • 14
  • 34
  • 37
  • 1
    As long as you are using `cpython` multithreading won't help you. The Global Interpreter Lock (GIL) only lets one thread execute at a time. There are other parallel versions of python that may help. You can get a bit of a speed up by replacing the `for word in...` part with `counter.update(word for word in line.strip().split())` and by opening the file with a large buffer. – tdelaney Feb 10 '16 at 00:22
  • [Great resource](http://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes-in-python) – Kevin Feb 10 '16 at 00:24
  • have you looked at the multiprocessing library. each process can be used to count the words in a portion of the file and will have it's own distinct intermediary dictionary. you can then use a Multiprocessing queue to share data with the main process – Garrett R Feb 10 '16 at 00:25
  • I don't suppose rewriting the program to use a compiled language is an option? (Because if speed is what you're looking for, a compiled language like C++ could do the same job fast enough that even a single thread would be limited only by the speed of the hard drive, rather than by the overhead of the in-memory computations) – Jeremy Friesner Feb 10 '16 at 00:49

2 Answers2

0

The concurrent futures module uses processes instead of threads to get around the GIL issue. You can submit tasks to a pool to be processed in parallel. When you submit a task to the threadpool, it returns an object that represents a running task (this is called a future). You can start multiple tasks this way. When you are ready to get the result of the task, you can call future.result(). Here is an example that gets the total length of all the strings in a list in parallel:

from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict

def runTask(lines):
    counts = defaultdict(int)
    for line in lines:
        for word in line.split():
            counts[word] += 1

    return counts

pool = ThreadPoolExecutor(4)
futures = []
chunkSize = 4
lines = []

with open("test.txt") as f:
    for line in f:
        if len(lines) == chunkSize:
            futures.append(pool.submit(runTask, lines))
            lines = [] 
        else:
            lines.append(line)

    if len(lines) > 0:
        futures.append(pool.submit(runTask, lines))

# Sum up totals
finalCount = defaultdict(int)
for f in futures:
    result = f.result()
    for k in result:
        finalCount[k] += result[k]

for word in finalCount:
    print("{0}: {1}".format(word, finalCount[word]))

This is a first attempt to help get you started.

tmajest
  • 360
  • 1
  • 5
  • Are the arguments passed over pipes to the subprocesses? If so this just replaces reading lines from a file to reading lines from a pipe. And anyway, Hakim needs the dictionary created in his top-level process so I don't think this will help, as cool as it is. – Zan Lynx Feb 10 '16 at 01:01
  • I'm not sure. As an improvement, I'd start with passing the file offsets to the subprocess so that the top level process doesn't have to read all of the lines. – tmajest Feb 10 '16 at 01:06
  • Thank you tmajest. I implemented your approach but not only it does not improve the performance, it even makes it worst. I noticed that even though I have set the number of threads to 8, the CPU usage is only 140%, and seems there are lots of overloads in managing the threads. So, by comparing the time of the two approaches (serial vs parallel), I noticed that the required time has increased significantly (2mins vs around 5mins, in serial and parallel settings respectively). – Hakim Feb 10 '16 at 09:49
  • Sorry, but I gave a "-" to this solution because it does not help at all. And as I said in the previous comment, it even makes the run time worst. – Hakim Feb 10 '16 at 10:40
  • This is kind of a reference for you to tweak according to your needs. I'd start with reading more than 4 lines of text for each process. You can also try to move the code that reads the text from the file out of the main process and into the new process. As I said, this is a first attempt to help you get started. – tmajest Feb 10 '16 at 22:41
0

This sounds like the canonical Word Count example from all Map-Reduce literature. If this is anything other than a one-off analysis, and your input file really is Huge (as in Big Data), you may consider leveraging Hadoop or Spark.

The very first example on the Spark example page has something that you could copy, nearly verbatim:

text_file = sc.textFile("file:///path/to/your/input/file")
counts = text_file.flatMap(lambda line: line.strip().split()) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a + b)
vocab = dict(counts.collect())

Download Spark and get it working locally, then scale out the problem in EMR (with S3 for your file system), as needed.