13

I have a 9GB text file of tweets in the following format:

T      'time and date'
U      'name of user in the form of a URL'
W      Actual tweet

There are in total 6,000,000 users and more than 60,000,000 tweets. I read 3 lines at a time using itertools.izip() and then according to the name, write it into a file. But its taking way too long (26 hours and counting). How can this be made faster?

Posting code for completeness,

s='the existing folder which will have all the files'
with open('path to file') as f:
 for line1,line2,line3 in itertools.izip_longest(*[f]*3):
            if(line1!='\n' and line2!='\n' and line3!='\n'):
     line1=line1.split('\t')
     line2=line2.split('\t')
     line3=line3.split('\t')
     if(not(re.search(r'No Post Title',line1[1]))):
         url=urlparse(line3[1].strip('\n')).path.strip('/')

  if(url==''):
   file=open(s+'junk','a')
   file.write(line1[1])
   file.close()
  else:
   file=open(s+url,'a')
   file.write(line1[1])
   file.close()

My aim is to use topic modeling on the small texts (as in, running lda on all the tweets of one user, thus requiring a separate file for each user), but its taking way too much time.

UPDATE: I used the suggestions by user S.Lott and used the following code :

import re
from urlparse import urlparse
import os 
def getUser(result):
    result=result.split('\n')
    u,w=result[0],result[1]
    path=urlparse(u).path.strip('/')
    if(path==''):
        f=open('path to junk','a')
        f.write('its Junk !!')
        f.close()
    else:
        result="{0}\n{1}\n{2}\n".format(u,w,path)
        writeIntoFile(result)
def writeIntoFile(result):
    tweet=result.split('\n')
    users = {}
    directory='path to directory'
    u, w, user = tweet[0],tweet[1],tweet[2]
    if user not in users :
        if(os.path.isfile(some_directory+user)):
            if(len(users)>64):
                lru,aFile,u=min(users.values())
                aFile.close()
                users.pop(u)
            users[user]=open(some_directory+user,'a')
            users[user].write(w+'\n')
            #users[user].flush
        elif (not(os.path.isfile(some_directory+user))):
            if len(users)>64:
                lru,aFile,u=min(users.values())
                aFile.close()
                users.pop(u)

            users[user]=open(some_directory+user,'w')
            users[user].write(w+'\n')
    for u in users:
        users[u].close()
import sys
s=open(sys.argv[1],'r')
tweet={}
for l in s:
    r_type,content=l.split('\t')
    if r_type in tweet:
    u,w=tweet.get('U',''),tweet.get('W','')
            if(not(re.search(r'No Post Title',u))):
                result="{0}{1}".format(u,w)
                getUser(result)
                tweet={}
        tweet[r_type]=content

Obviously, it is pretty much a mirror of what he suggested and kindly shared too. Initially the speed was very fast but then it has got slower . I have posted the updated code so that i can get some more suggestions on how it could have been made faster. If i was reading from sys.stdin, then there was an import error which could not be resolved by me. So, to save time and get on with it, i simply used this , hoping that it works and does so correctly. Thanks.

crazyaboutliv
  • 3,029
  • 9
  • 33
  • 50

7 Answers7

24

This is why your OS has multiprocessing pipelines.

collapse.py sometweetfile | filter.py | user_id.py | user_split.py -d some_directory

collapse.py

import sys
with open("source","r") as theFile:
    tweet = {}
    for line in theFile:
        rec_type, content = line.split('\t')
        if rec_type in tweet:
            t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
            result=  "{0}\t{1}\t{2}".format( t, u, w )
            sys.stdout.write( result )
            tweet= {}
        tweet[rec_type]= content
    t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
    result=  "{0}\t{1}\t{2}".format( t, u, w )
    sys.stdout.write( result )

filter.py

import sys
for tweet in sys.stdin:
    t, u, w = tweet.split('\t')
    if 'No Post Title' in t:
        continue
    sys.stdout.write( tweet )

user_id.py

import sys
import urllib
for tweet in sys.stdin:
    t, u, w = tweet.split('\t')
    path=urlparse(w).path.strip('/')
    result= "{0}\t{1}\t{2}\t{3}".format( t, u, w, path )
    sys.stdout.write( result )

user_split.py

users = {}
for tweet in sys.stdin:
    t, u, w, user = tweet.split('\t')
    if user not in users:
        # May run afoul of open file limits...
        users[user]= open(some_directory+user,"w")
    users[user].write( tweet )
    users[user].flush( tweet )
for u in users:
    users[u].close()

Wow, you say. What a lot of code.

Yes. But. It spreads out among ALL the processing cores you own and it all runs concurrently. Also, when you connect stdout to stdin through a pipe, it's really only a shared buffer: there's no physical I/O occurring.

It's amazingly fast to do things this way. That's why the *Nix operating systems work that way. This is what you need to do for real speed.


The LRU algorithm, FWIW.

    if user not in users:
        # Only keep a limited number of files open
        if len(users) > 64: # or whatever your OS limit is.
            lru, aFile, u = min( users.values() )
            aFile.close()
            users.pop(u)
        users[user]= [ tolu, open(some_directory+user,"w"), user ]
    tolu += 1
    users[user][1].write( tweet )
    users[user][1].flush() # may not be necessary
    users[user][0]= tolu
S.Lott
  • 384,516
  • 81
  • 508
  • 779
  • Thanks a lot for your solution . Getting a sys.excepthook problem, i guess it has something to do with the synchronization. Trying to google and find out solution to that. but your solution is awesome, never thought on those lines at all :( – crazyaboutliv Jan 12 '11 at 14:02
  • "sys.excepthook problem"? "synchronization"? Can you be more specific? – S.Lott Jan 12 '11 at 14:18
  • Yeah sure. I run the commands as you have mentioned in the reply. It shows the following : bash: ./user_id.py: Permission denied bash: ./filter.py: Permission denied bash: ./user_split.py: Permission denied close failed in file object destructor: Error in sys.excepthook: Original exception was: Thats all. This is what i get when executed. – crazyaboutliv Jan 12 '11 at 14:49
  • Are you aware of the Unix `chmod` command to make a file executable? If not, you could always try doing `python collapse.py`, `python filter.py`, etc. You should probably read up on how the GNU/Linux shell works before typing code you don't understand. – S.Lott Jan 12 '11 at 14:53
  • Did you type `with open("source","r") as theFile:` in collapse.py without really thinking about what the line of code *means*? – S.Lott Jan 12 '11 at 14:57
  • With all due respect to you, yes i do know what chmod means and no, i did not write that open() line without knowing what it means. I always run code just after creating the file using gedit/vim and did not know that not making it an executable will create problems. When run alone, they work fine . That was also checked . And yes, i have never written such good code but i do understand it . Thanks. – crazyaboutliv Jan 12 '11 at 15:09
  • @crazyaboutliv. Good. The initial question about bash and permissions seemed to indicate a complete lack of background in Unix, shell or Python. It's hard to guess what other questions may arise. I felt it better to assume the worst -- no background. I'm very happy to be wrong in my assumption. – S.Lott Jan 12 '11 at 15:14
  • @S. Lott: using a variable called `type` is not recommended, as this is a Python keyword. – Eric O. Lebigot Jan 12 '11 at 15:51
  • @crazyaboutliv: Are many of your processors really used concurrently, with the above program? I would have guessed that your program was disk-bound, not processor-bound, because of the numerous read-writes. – Eric O. Lebigot Jan 12 '11 at 15:56
  • @EOL, no, the code that i posted is not doing so. Either way, i have a normal dual-core system with just 2 gb RAM and what i wrote paralysed my system badly :) . And its still running as well :( – crazyaboutliv Jan 12 '11 at 16:04
  • @S.Lott, That error is gone but now i am getting another error, googling for which says that it is some bug with Ubuntu :O import: unable to read X window image `': Resource temporarily unavailable @ xwindow.c/XImportImage/5020. It also says ./filter.py :syntax error near 'tweet=' and same error with the other two files, ./user_id.py and ./user_split.py . I ran them separately writing the output into individual files for a smaller dataset and they work absolutely fine that way. – crazyaboutliv Jan 12 '11 at 16:05
  • @crazyaboutliv: `syntax error near 'tweet='` can only happen in `collapse.py`, since that's the only file with `tweet=` in it. – S.Lott Jan 12 '11 at 16:12
  • @crazyaboutliv: Your OS may not allow 60 Lakh of open files. Odds are good it doesn't allow much over 60 open files, much less 1 lakh. The LRU algorithm needs to be added to the `user_split.py`. You can't just open files without any limitation. You have to close some to open others. – S.Lott Jan 12 '11 at 16:14
  • i modified the code before running it. Time was in the form – crazyaboutliv Jan 12 '11 at 16:20
  • @Lott, point taken sir. let me do that then . Also, i have used your book a little. Fact is that after starting the book, i got busy and never got around to solving even the first chapter exercises :( . (I hope that in stackoverflow,comments,unlike questions, can go out of the technical world . Otherwise, this can be deleted) – crazyaboutliv Jan 12 '11 at 16:22
  • @crazyaboutliv: `s.split('\n')`? I don't see that anywhere on this page. It's not part of my answer, so I'm afraid I can't help. – S.Lott Jan 12 '11 at 16:23
  • I am actually wondering if i can edit my original question to indicate the small changes i made to the code you posted and also use it to finally wrap up the answer ( whenever the errors go away ) . There is an edit option but wondering even then . – crazyaboutliv Jan 12 '11 at 16:26
  • @crazyaboutliv: I don't think it's a good idea to make dramatic changes to this question. I think it's better to **focus** very, very carefully on very specific technical issues with just one of the new programs you've written and ask that as a new question. – S.Lott Jan 12 '11 at 16:49
3

You spend most of the time in I/O. Solutions:

  • make larger I/O operations, i.e. read into a buffer of let's say 512K and do not write information till you have a buffer of 256K at least.
  • avoid doing file open and close as much as possible
  • use several threads to read from the file, i.e. split the file to chunks and give each thread it's own chunk to work on
Drakosha
  • 11,925
  • 4
  • 39
  • 52
  • 3
    Using several threads to read from the file isn't going to anything useful if the bottleneck is getting data off of the disk. – Laurence Gonsalves Jan 12 '11 at 10:10
  • @Laurence Gonsalves: I agree, but it cound be striped raid, and then it'd help. – Drakosha Jan 12 '11 at 10:13
  • Reading into buffer, good idea. Let me try that out, haven't done that yet. I hope that gives me an appreciable speedup. – crazyaboutliv Jan 12 '11 at 10:36
  • reading as buffer is working fine but reading from buffer and doing the processing as outlined above is not working . The buffer is one big string . :( – crazyaboutliv Jan 12 '11 at 11:35
  • -1: Threads. Multiple threads reading from a single file will still be I/O bound, since the threads all share a single FD and buffer. – S.Lott Jan 12 '11 at 12:20
1

For such a mass of information, I would use a database (MySQL, PostgreSQL, SQLite, etc.). They are optimized for the kind of things you are doing.

Thus, instead of appending to a file, you could simply add a line to a table (either the junk or the "good" table), with the URL and the associated data (the same URL can be on multiple lines). This would definitely speed up the writing part.

With the current approach, time is lost because the input file is read from one location on your hard drive while you write on many different locations: the head of the hard drive moves back and forth physically, which is slow. Also, creating new files takes time. If you could mostly read from the input file and let a database handle the data caching and disk writing optimization, the processing would undoubtedly be faster.

Eric O. Lebigot
  • 91,433
  • 48
  • 218
  • 260
  • I may be wrong about this. I need to give the data to Mallet and i think mallet only takes plain text-files. So does Vowpal Wabbit. Please let me know if i am wrong. – crazyaboutliv Jan 12 '11 at 10:35
  • 1
    @crazyaboutliv: But first you're trying to break it down by user. Your current script repeatedly opens and closes a lot of files, making it very slow. If you put it all in a database, then pull each user out of the database back into a text file, it will be much quicker (although it will still take a while, of course). – Thomas K Jan 12 '11 at 11:55
1

Not sure if this is going to be faster, just an idea. Your file looks like a csv with tabs as delimiters. Have you tried createing a CSV reader?

import csv
reader = csv.reader(open('bigfile'), 'excel-tab')
for line in reader:
    process_line()

EDIT: Calling csv.field_size_limit(new_limit) is pointless here.

Attila O.
  • 15,659
  • 11
  • 54
  • 84
  • yes, well you can definitely say that its a csv-file with tab separator, kinda tsv(tab separated values). Let me try this as well. Thanks. – crazyaboutliv Jan 12 '11 at 10:37
0

I think writing line by line is a run-time killer when working with such huge data. You can accelerate it significantly with vectorized operations, i.e. read/write a bunch of lines at once as in this answer here

computerist
  • 872
  • 8
  • 9
0

You could try creating a dict with the format {url: [lines...]}, and only writing each file at the end. I suspect that repeatedly opening and closing files is a lot of overhead. How many lines are you writing per file on average? If basically each line is getting its own file then there's not much you can do, except change that requirement :)

Karl Knechtel
  • 62,466
  • 11
  • 102
  • 153
  • Yes , out of 60 lakh users, a little less than lakh have more than 50 tweets .But, i thought that this is a one-time operation and that is why waiting for it to finish. Its very slow. It has written only about 1 GB of data in 7 hours today. – crazyaboutliv Jan 12 '11 at 11:25
  • Creating dicts would work if the tweets were in user order, so you could flush to disk at the end of each user's section. Otherwise, you'll most likely run out of memory. – Thomas K Jan 12 '11 at 11:33
0

On my system at least, almost all of the running time would be spent closing files. Sequential reading and writing is fast, so you can very well make several passes over the data. Here's what I'd do:

  1. Split the file into as many files as you can open at once, in such a way that
    • a user's tweets go to the same file
    • you keep track of which files contain more than one user
  2. Go on splitting the output files, until every file contains only one user

If you can write to 200 files in parallel, after two passes over all the data you'd have 40000 files containing 150 users on average, so after the third pass you'd probably be nearly done.

Here's some code that assumes the file has been preprocessed according to S.Lott's answer (collapse, filter, user_id). Note that it will delete the input file along with other intermediate files.

todo = ['source']
counter = 0

while todo:
    infilename = todo.pop()
    infile = open(infilename)
    users = {}
    files = []
    filenames = []
    for tweet in infile:
        t, u, w, user = tweet.split('\t')
        if user not in users:
            users[user] = len(users) % MAX_FILES
            if len(files) < MAX_FILES:
                filenames.append(str(counter))
                files.append(open(filenames[-1], 'w'))
                counter += 1
        files[users[user]].write(tweet)
    for f in files:
        f.close()
    if len(users) > MAX_FILES:
        todo += filenames[:len(users)-MAX_FILES]
    infile.close()
    os.remove(infilename)
Janne Karila
  • 24,266
  • 6
  • 53
  • 94