3

I am trying to read a large text file > 20Gb with python. File contains positions of atoms for 400 frames and each frame is independent in terms of my computations in this code. In theory I can split the job to 400 tasks without any need of communication. Each frame has 1000000 lines so the file has 1000 000* 400 lines of text. My initial approach is using multiprocessing with pool of workers:

def main():
   """ main function
   """
   filename=sys.argv[1]
   nump = int(sys.argv[2])
   f = open(filename)
   s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
   cursor = 0
   framelocs=[]
   start = time.time()
   print (mp.cpu_count())
   chunks = []
   while True:
        initial = s.find(b'ITEM: TIMESTEP', cursor)
        if initial == -1:
            break
        cursor = initial + 14
        final = s.find(b'ITEM: TIMESTEP', cursor)
        framelocs.append([initial,final])
        #readchunk(s[initial:final])
        chunks.append(s[initial:final])
        if final == -1:
           break

Here basically I am seeking file to find frame begins and ends with opening file with python mmap module to avoid reading everything into memory.

def readchunk(chunk):
   start = time.time()
   part = chunk.split(b'\n')
   timestep= int(part[1])
   print(timestep)

Now I would like to send chunks of file to pool of workers to process. Read part should be more complex but those lines will be implemented later.

   print('Seeking file took %8.6f'%(time.time()-start))
   pool = mp.Pool(nump)
   start = time.time()
   results= pool.map(readchunk,chunks[0:16])
   print('Reading file took %8.6f'%(time.time()-start))

If I run this with sending 8 chunks to 8 cores it would take 0.8 sc to read. However If I run this with sending 16 chunks to 16 cores it would take 1.7 sc. Seems like parallelization does not speed up. I am running this on Oak Ridge's Summit supercomputer if it is relevant, I am using this command:

jsrun -n1 -c16 -a1 python -u ~/Developer/DipoleAnalyzer/AtomMan/readlargefile.py DW_SET6_NVT.lammpstrj 16

This supposed to create 1 MPI task and assign 16 cores to 16 threads. Am I missing here something? Is there a better approach?

dundar yilmaz
  • 141
  • 1
  • 6
  • (1) I'm not sure if this really avoids copying of the chunks. Better send only the chunk boundaries to subprocesses and let them read the actual chunk. (2) A simple test code may have more overhead than actual work so the times may not be representative. – Michael Butscher Dec 11 '19 at 14:50
  • Parallelization isn't going to speed up disk I/O to the single physical drive containing the file — and using multiprocess often introduces a fair amount of overhead on it's own. – martineau Dec 11 '19 at 15:04

1 Answers1

2

As others have said, there is some overhead when making processes so you could see a slowdown if testing with small samples.

Something like this might be neater. Make sure you understand what the generator function is doing.

import multiprocessing as mp
import sys
import mmap


def do_something_with_frame(frame):
    print("processing a frame:")
    return 100


def frame_supplier(filename):
    """A generator for frames"""
    f = open(filename)
    s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

    cursor = 0
    while True:
        initial = s.find(b'ITEM: TIMESTEP', cursor)
        if initial == -1:
            break
        cursor = initial + 14
        final = s.find(b'ITEM: TIMESTEP', cursor)

        yield s[initial:final]

        if final == -1:
            break


def main():
    """Process a file of atom frames

    Args:
      filename: the file to process
      processes: the size of the pool
    """
    filename = sys.argv[1]
    nump = int(sys.argv[2])

    frames = frame_supplier(filename)

    pool = mp.Pool(nump)

    # play around with the chunksize
    for result in pool.imap(do_something_with_frame, frames, chunksize=10):
        print(result)

Disclaimer: this is a suggestion. There may be some syntax errors. I haven't tested it.

EDIT:

  • It sounds like your script is becoming I/O limited (i.e. limited by the rate at which you can read from disk). You should be able to verify this by setting the body of do_something_with_frame to pass. If the program is I/O bound, it will still take nearly as long.

  • I don't think MPI is going to make any difference here. I think that file-read speed is probably a limiting factor and I don't see how MPI will help.

  • It's worth doing some profiling at this point to find out which function calls are taking the longest.

  • It is also worth trying without mmap():

frame = []
with open(filename) as file:
    for line in file:
        if line.beginswith('ITEM: TIMESTEP'):
            yield frame
        else:
            frame.append(line)
FiddleStix
  • 3,016
  • 20
  • 21
  • Can you explain little bit? Here chunksize=10 means you are sending 10 frames to each pool worker? Also results here are ordered? – dundar yilmaz Dec 11 '19 at 19:02
  • I have tested your code and it is much more neat then mine. However when I run this on 8 cores or 16 cores it still gives the same speed about 103 seconds to read the file which is 20gb. Is there a race condition accessing the same file between pool workers? Do you think MPI4PY would be better approach? – dundar yilmaz Dec 11 '19 at 20:02
  • @dundar yilmaz, yes, chunksize=10 means you are sending 10 frames to each pool worker. I'm not sure what implications this has for memory usage. The answer to this question gives, IMO, a good explanation: https://stackoverflow.com/questions/53306927/chunksize-irrelevant-for-multiprocessing-pool-map-in-python/53307813 – FiddleStix Dec 12 '19 at 12:42