1

I have a python script that would traverse a list(>1000 elements), find the variable in a large file and then output the result. I am reading the entire file >1000 times. I tried using multiprocessing, but not of much help. Here's what I am trying to do:

import gzip
from multiprocessing.pool import ThreadPool as Pool

def getForwardIP(clientIP, requestID):
   with gzip.open("xyz.log") as infile:
     for lines in infile:
        line= lines.split(" ")
        myRequestID= line[0]
        forwardIP= line[1]
        if myRequestID==requestID:
           print forwardIP
if __name__== "__main__":
    pool_size=8
    pool= Pool(pool_size)
    request_id_list= list()
    #request_id_list contains >1000 elements
    for id in request_id_list:
      pool.apply_async(getForwardIP, ("1.2.3.4.", id, ))
   pool.close()
   pool.join()

Is there a faster way? Any help will be appreciated. Thanks!

EDIT

(I AM WRITING MY ENTIRE CODE HERE) Thanks everyone for the suggestions. Now I am writing the file into a list rather than reading 1000 times. I tried to multi-process the for loop, but it didn't work. Below is the code:

import gzip
import datetime
from multiprocessing.pool import ThreadPool as Pool

def getRequestID(r_line_filename):
  requestIDList= list()
  with gzip.open(r_line_filename) as infile:
  #r_line_filename is a file with request_id and client_ip
    for lines in infile:
        line= lines.split(" ")
        requestID= line[1].strip("\n")
        myclientIP= line[0]
        if myclientIP==clientIP:
            requestIDList.append(requestID)  
   print "R line List Ready!"
   return(requestIDList)  



def getFLineList(fFilename):
   fLineList= list()
   with gzip.open(fFilename) as infile:
   #fFilename is a file with format request_id, forward_ip, epoch time
     for lines in infile:
        fLineList.append(lines.split())
  print "F line list ready!"
  return(fLineList)

def forwardIP(lines, requestID):
 myrequestID= lines[0]
 forwardIP= lines[1]
 epoch= int(lines[2].split(".")[0])
 timex= datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d %H:%M:%S')
 if myrequestID==requestID:
    print "%s %s %s"%(clientIP, timex, forwardIP)

if __name__== "__main__":
 pool= Pool()
 clientIP= "x.y.z.a"
 rLineList= getRequestID("rLine_subset.log.gz")
 fLineList= getFLineList("fLine_subset.log.gz")
 for RID in rLineList:
    for lines in fLineList:
        pool.apply_async(forwardIP, (lines, RID,))
    pool.close()
    pool.join()

The multi-processing part is not working. Actually, this one is much slower. If I don't do multi-processing and simply traverse the list, it is faster. Thanks for your help in advance!

Praniti Gupta
  • 149
  • 2
  • 3
  • 11
  • 4
    why not read the file at the beggining of the script, and the use dictionaries or list. Every time you call getForwardIP you read the file from scratch. I don't see the point in doing that – patito Feb 22 '17 at 23:33
  • if the the file is really large and cannot fit request_id -> ip map in memory, you can the technic described here http://stackoverflow.com/a/620492/493928 and run each process on a set of offsets you collect – khachik Feb 23 '17 at 01:17

3 Answers3

2

I agree with mwm314 that you shouldn't be reading the file 1000 times.

I'm assuming you haven't given us the complete code because the client_ip parameter seems to be unused, but here I have rewritten it to only open the file once and to only iterate through each line in the file once. I've also modified getForwardIP to take a list of request ids and immediately turn it into a set for optimal lookup performance.

import gzip


def getForwardIP(client_ip, request_ids):
    request_ids = set(request_ids)  # to get O(1) lookup
    with gzip.open("xyz.log") as infile:
        for lines in infile:
            line = lines.split(" ")
            found_request_id = line[0]
            found_forward_ip = line[1]
            if found_request_id in request_ids:
                print found_forward_ip


if __name__ == "__main__":
    request_id_list = list()
    # request_id_list contains >1000 elements
    getForwardIP("1.2.3.4.", request_id_list)
Terrence
  • 194
  • 1
  • 12
  • Yes! This is not the entire code. Turning list to set helped. Will calling each element of the list in different processes optimize it further? – Praniti Gupta Feb 23 '17 at 20:16
  • You mean run `if found_request_id in request_ids:` inside a thread pool? You could try! Since non-write operations on sets are thread-safe, I don't think it could cause any harm. – Terrence Feb 23 '17 at 21:01
  • Yes! Please refer to the edit. I have posted the entire code. – Praniti Gupta Feb 23 '17 at 21:31
  • In your new code, I have a hunch the bottleneck is now stdout. If you're on a Unix system, try running your script in a shell/Terminal with `python yourscript.py > output_file.txt` – Terrence Feb 23 '17 at 22:32
  • I am using the same method to run the script. – Praniti Gupta Feb 23 '17 at 22:56
  • So what do you mean by "I tried to multi-process the for loop, but it didn't work"? It still slow? Are you not getting your expected output? – Terrence Feb 23 '17 at 23:06
  • Shouldn't forwardIP() be spawned in multiple processes? I cannot see that happening. – Praniti Gupta Feb 23 '17 at 23:20
  • I noticed that in your new code, you removed the argument when instantiating `pool`. What does `multiprocessing.cpu_count()` return? [The docs say](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool) that the default number of worker processes is the number returned by `cpu_count()`. – Terrence Feb 23 '17 at 23:35
  • That's 8. I believe pool= Pool() is equivalent to pool= Pool(multiprocessing.cpu_count). – Praniti Gupta Feb 23 '17 at 23:36
  • Yes, I just wanted to make sure you weren't running in a VM or something that would limit it. How are you verifying that `forwardIP` isn't being spawned in multiple processes? – Terrence Feb 23 '17 at 23:38
  • I am not much aware of how multi-processing works. I was just comparing the time it takes the script to display results with and without multi- processing. – Praniti Gupta Feb 23 '17 at 23:52
  • What happens when you use my `found_request_id in request_ids` optimization instead of using a for loop in the `if __name__ == "__main__"` block? There's an overhead with multiprocessing and what might be happening is that you're creating too many tasks such that the overhead is greater than the multiprocessing benefits. – Terrence Feb 24 '17 at 01:10
  • Might be a possibility. Thanks for the help. – Praniti Gupta Feb 24 '17 at 21:06
1

There is indeed a faster way. Don't read and parse the file in 1000 times. Instead, read it in once, parse it once, and store it. File I/O is one of the slowest things you can do (in any language). In memory processing is much faster!

Something like this (obviously untested since I don't have "xyz.log" accessible to me. And for the hawks: obviously I didn't profile it either, but I have a sneaky suspicion reading a file once is faster than reading it 1000 times):

import gzip

def readFile():
  my_lines = []
  with gzip.open("xyz.log") as infile:
     for lines in infile:
        line = lines.split(" ")
        my_lines.append(line)

  return my_lines

def getForwardIp(lines, requestID): #Doesn't look like you need client IP (yet), so I nuked it
  myRequestID= line[0]
  forwardIP= line[1]
  if myRequestID==requestID:
     print forwardIP

if __name__ == "__main__":
  parsed_lines = readFile()
  request_id_list= list()
  #request_id_list contains >1000 elements
  for id in request_id_list:
    getForwardIp(parsed_lines, requestID)
Matt Messersmith
  • 12,939
  • 6
  • 51
  • 52
  • Thanks! Since the file size is huge, isn't it possible that we won't be able to store the entire content in the memory/list? – Praniti Gupta Feb 23 '17 at 00:00
  • If that's the case, you'll have a rough time. You'd have a couple of options. You could use something like AWS/Spark to get more memory for a one-off job, or you could buy a bigger machine. Alternatively, you could use a generator instead of returning a list if the `request_id_list` is in the same order as how they appear in the file (using the `yield` keyword). Is this the actual case? Does it not fit in memory? Not much use theorizing if it doesn't matter ;) – Matt Messersmith Feb 23 '17 at 00:42
  • It does fit in the memory, but in this case. My concern was if I have to work on even larger file (which I will definitely have to), what's the other option. For this one, your solution worked. Thanks! – Praniti Gupta Feb 23 '17 at 20:18
1

I would probably scan through the single large file for all of the requested IDs, then fully utilize the ThreadPool for invoking getForwardIP().

You could partition the single large file into multiple regions and have multiple workers process different partitions of the file, but this approach has some challenges and might not work on all file systems.

FluxIX
  • 325
  • 1
  • 2
  • 13