2

I have 3 huge CSV files containing climate data, each about 5GB. The first cell in each line is the meteorological station's number (from 0 to about 100,000) each station contains from 1 to 800 lines in each file, which is not necessarily equal in all files. For example, Station 11 has 600, 500, and 200 lines in file1, file2, and file3 respectively. I want to read all the lines of each station, do some operations on them, then write results to another file, then the next station, etc. The files are too large to load at once in memory, so I tried some solutions to read them with minimal memory load, like this post and this post which include this method:

with open(...) as f:
    for line in f:
        <do something with line> 

The problem with this method that it reads the file from the beginning every time, while I want to read files as follows:

for station in range (100798):
    with open (file1) as f1, open (file2) as f2, open (file3) as f3:
        for line in f1:
            st = line.split(",")[0]
            if st == station:
                <store this line for some analysis>
            else:
                break   # break the for loop and go to read the next file
        for line in f2:
            ...
            <similar code to f1>
            ...
        for line in f3:
            ...
            <similar code to f1>
            ...
    <do the analysis to station, the go to next station>

The problem is that each time I start over to take next station, the for loop would start from the beginning, while I want it to start from where the 'Break' occurs at the nth line, i.e. to continue reading the file.

How can I do it?

Thanks in advance

Notes About the solutions below: As I mentioned below at the time I posted my answer, I implemented the answer of @DerFaizio but I found it very slow in processing.

After I had tried the generator-based answer submitted by @PM_2Ring I found it very very fast. Maybe because it depends on Generators.

The difference between the two solutions can be noticed by the numbers of processed stations per minutes which are 2500 st/min for the generator based solution, and 45 st/min for the Pandas based solution. where the Generator based solution is >55 times faster.

I will keep both implementations below for reference. Many thanks to all contributors, especially @PM_2Ring.

Community
  • 1
  • 1
Mohammad ElNesr
  • 2,477
  • 4
  • 27
  • 44
  • you could store file position using `f1.tell()` and seek back to it the next time. – Jean-François Fabre Feb 06 '17 at 08:34
  • Thank you @Jean-FrançoisFabre, but, this takes a long time, as each file contains more than 500 Milion lines. And Without storing the file.tell() position, I can search for the station number again as they are sorted. Thanks again for your suggestion, but I think there is better solution. – Mohammad ElNesr Feb 06 '17 at 08:39
  • the problem is that the lines are of variable size, so to reach line N you have to go through all previous lines at least once (and cache the result afterwards). Good luck with that. – Jean-François Fabre Feb 06 '17 at 08:41
  • But your `with` block *outside* the outer for-loop. Each time exit and re-enter the `with` block, the files are closed and reopened, so they start from the beginning again. – juanpa.arrivillaga Feb 06 '17 at 08:43
  • Each of these 3 files contains data for every station number in `range(100798)`, and the data lines in each file are sorted by station number. Is that correct? – PM 2Ring Feb 06 '17 at 09:22
  • @PM2Ring Yes, correct, but the number of lines for each station in each file varies. – Mohammad ElNesr Feb 06 '17 at 09:31

4 Answers4

2

The code below iterates over the files line by line, grabbing the lines for each station from each file in turn and appending them to a list for further processing.

The heart of this code is a generator file_buff that yields the lines of a file but which allows us to push a line back for later reading. When we read a line for the next station we can send it back to file_buff so that we can re-read it when it's time to process the lines for that station.

To test this code, I created some simple fake station data using create_data.

from random import seed, randrange

seed(123)

station_hi = 5
def create_data():
    ''' Fill 3 files with fake station data '''
    fbase = 'datafile_'
    for fnum in range(1, 4):
        with open(fbase + str(fnum), 'w') as f:
            for snum in range(station_hi):
                for i in range(randrange(1, 4)):
                    s = '{1} data{0}{1}{2}'.format(fnum, snum, i)
                    print(s)
                    f.write(s + '\n')
        print()

create_data()

# A file buffer that you can push lines back to
def file_buff(fh):
    prev = None
    while True:
        while prev:
            yield prev
            prev = yield prev
        prev = yield next(fh)

# An infinite counter that yields numbers converted to strings
def str_count(start=0):
    n = start
    while True: 
        yield str(n)
        n += 1

# Extract station data from all 3 files
with open('datafile_1') as f1, open('datafile_2') as f2, open('datafile_3') as f3:
    fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)

    for snum_str in str_count():
        station_lines = []
        for fb in (fb1, fb2, fb3):
            for line in fb:
                #Extract station number string & station data
                sid, sdata = line.split()
                if sid != snum_str:
                    # This line contains data for the next station,
                    # so push it back to the buffer
                    rc = fb.send(line)
                    # and go to the next file
                    break
                # Otherwise, append this data
                station_lines.append(sdata)

        #Process all the data lines for this station
        if not station_lines:
            #There's no more data to process
            break
        print('Station', snum_str)
        print(station_lines)

output

0 data100
1 data110
1 data111
2 data120
3 data130
3 data131
4 data140
4 data141

0 data200
1 data210
2 data220
2 data221
3 data230
3 data231
3 data232
4 data240
4 data241
4 data242

0 data300
0 data301
1 data310
1 data311
2 data320
3 data330
4 data340

Station 0
['data100', 'data200', 'data300', 'data301']
Station 1
['data110', 'data111', 'data210', 'data310', 'data311']
Station 2
['data120', 'data220', 'data221', 'data320']
Station 3
['data130', 'data131', 'data230', 'data231', 'data232', 'data330']
Station 4
['data140', 'data141', 'data240', 'data241', 'data242', 'data340']

This code can cope if station data is missing for a particular station from one or two of the files, but not if it's missing from all three files, since it breaks the main processing loop when the station_lines list is empty, but that shouldn't be a problem for your data.


For details on generators and the generator.send method, please see 6.2.9. Yield expressions in the docs.

This code was developed using Python 3, but it will also run on Python 2.6+ (you just need to include from __future__ import print_function at the top of the script).


If there may be station ids missing from all 3 files we can easily handle that. Just use a simple range loop instead of the infinite str_count generator.

from random import seed, randrange

seed(123)

station_hi = 7
def create_data():
    ''' Fill 3 files with fake station data '''
    fbase = 'datafile_'
    for fnum in range(1, 4):
        with open(fbase + str(fnum), 'w') as f:
            for snum in range(station_hi):
                for i in range(randrange(0, 2)):
                    s = '{1} data{0}{1}{2}'.format(fnum, snum, i)
                    print(s)
                    f.write(s + '\n')
        print()

create_data()

# A file buffer that you can push lines back to
def file_buff(fh):
    prev = None
    while True:
        while prev:
            yield prev
            prev = yield prev
        prev = yield next(fh)

station_start = 0
station_stop = station_hi

# Extract station data from all 3 files
with open('datafile_1') as f1, open('datafile_2') as f2, open('datafile_3') as f3:
    fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)

    for i in range(station_start, station_stop):
        snum_str = str(i)
        station_lines = []
        for fb in (fb1, fb2, fb3):
            for line in fb:
                #Extract station number string & station data
                sid, sdata = line.split()
                if sid != snum_str:
                    # This line contains data for the next station,
                    # so push it back to the buffer
                    rc = fb.send(line)
                    # and go to the next file
                    break
                # Otherwise, append this data
                station_lines.append(sdata)

        if not station_lines:
            continue
        print('Station', snum_str)
        print(station_lines)

output

1 data110
3 data130
4 data140

0 data200
1 data210
2 data220
6 data260

0 data300
4 data340
6 data360

Station 0
['data200', 'data300']
Station 1
['data110', 'data210']
Station 2
['data220']
Station 3
['data130']
Station 4
['data140', 'data340']
Station 6
['data260', 'data360']
PM 2Ring
  • 54,345
  • 6
  • 82
  • 182
  • Thanks a lot @PM_2Ring This code looks excellent and smart, but I wonder why you converted the station number to string at the str_count generator? And What if I want to iterate over the original amount of stations 100797, as there already some station numbers missing from all the three files (there are more files that include other data for missing stations, but I want to process these three files of temperature only.) – Mohammad ElNesr Feb 06 '17 at 13:25
  • 1
    @MohammadElNesr I converted the station number to a string in the `str_count` generator because we need to test the station number string for every line we read, and it's more efficient to compare those number strings to a string than to convert each one to an integer to do the comparison. And I thought it was better to do that conversion in the generator than to clutter the main loop with a station number integer and a station number string. – PM 2Ring Feb 06 '17 at 13:47
  • 1
    @MohammadElNesr Before I started to write this code I asked if "Each of these 3 files contains data for every station number in `range(100798)`" and you replied that that was correct. I need to change the logic a little if that's not correct. But it's getting late in my time zone, and I probably won't have time to make that change until tomorrow. – PM 2Ring Feb 06 '17 at 13:50
  • No need @PM_ 2Ring to do further. As I have changed what is needed and the code is running like a charm now. Many Many thanks for your great effort. – Mohammad ElNesr Feb 06 '17 at 13:52
  • @MohammadElNesr No worries! – PM 2Ring Feb 06 '17 at 13:55
  • 1
    @MohammadElNesr I've added a new version that copes with missing stations, you just need to specify the station number range. – PM 2Ring Feb 06 '17 at 14:12
0

I would suggest to use pandas.read_csv. You can specify the rows to skip using skiprows and also use a reasonable number of rows to load depending on your filesize using nrows Here is a link to the documentation: http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html

DerFaizio
  • 148
  • 7
  • 1
    Firstly, you can implement this easily enough using the `csv` module, no need for `pandas`, and the problem is that the OP does not know the chunk-size ahead of time... at least that is how I interpreted it. – juanpa.arrivillaga Feb 06 '17 at 08:48
0

I posted the code below before @PM-2Ring posted his solution. I would like to leave both solutions active:

The #1 solution that depends on Pandas library (by @DerFaizio). :

This solution finished 5450 stations in 120 minutes (about 45 stations/minute)

import pandas as pd
skips =[1, 1, 1]  # to skip the header row forever
for station_number in range(100798):
    storage = {}
    tmax = pd.read_csv(full_paths[0], skiprows=skips[0], header=None, nrows=126000, usecols=[0, 1, 3])
    tmin = pd.read_csv(full_paths[1], skiprows=skips[1], header=None, nrows=126000, usecols=[0, 1, 3])
    tavg = pd.read_csv(full_paths[2], skiprows=skips[2], header=None, nrows=126000, usecols=[0, 1, 3])

    # tmax is at position 0
    for idx, station in enumerate(tmax[0]):
        if station == station_number:
            date_val = tmax[1][idx]
            t_val = float(tmax[3][idx])/10.
            storage[date_val] = [t_val, None, None]
            skips[0] += 1
        else:
            break
    # tmin is at position 1
    for idx, station in enumerate(tmin[0]):
        # station, date_val, _, val = lne.split(",")
        if station == station_number:
            date_val = tmin[1][idx]
            t_val = float(tmin[3][idx]) / 10.
            if date_val in storage:
                storage[date_val][1] = t_val
            else:
                storage[date_val] = [None, t_val, None]
            skips[1] += 1
        else:
            break
    # tavg is at position 2
    for idx, station in enumerate(tavg[0]):
        ...
        # similar to Tmin
        ...
        pass

    station_info = []
    for key in storage.keys():
        # do some analysis
        # Fill the list station_info 
        pass
    data_out.writerows(station_info)

The following solution is the Generator based solution (by @PM-2Ring)

This solution finished 30000 stations in 12 minutes (about 2500 stations/minute)

files = ['Tmax', 'Tmin', 'Tavg']
headers = ['Nesr_Id', 'r_Year', 'r_Month', 'r_Day', 'Tmax', 'Tmin', 'Tavg']

# A file buffer that you can push lines back to
def file_buff(fh):
    prev = None
    while True:
        while prev:
            yield prev
            prev = yield prev
        prev = yield next(fh)

# An infinite counter that yields numbers converted to strings
def str_count(start=0):
    n = start
    while True:
        yield str(n)
        n += 1

# NULL = -999.99
print "Time started: {}".format(time.strftime('%Y-%m-%d %H:%M:%S'))
with open('Results\\GHCN_Daily\\Important\\Temp_All_out_gen.csv', 'wb+') as out_file:
    data_out = csv.writer(out_file, quoting=csv.QUOTE_NONE, quotechar='', delimiter=',', escapechar='\\',
                          lineterminator='\n')
    data_out.writerow(headers)
    full_paths = [os.path.join(source, '{}.csv'.format(file_name)) for file_name in files]
    # Extract station data from all 3 files
    with open(full_paths[0]) as f1, open(full_paths[1]) as f2, open(full_paths[0]) as f3:
        fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)

        for snum_str in str_count():
            # station_lines = []
            storage ={}
            count = [0, 0, 0]
            for file_id, fb in enumerate((fb1, fb2, fb3)):
                for line in fb:
                    if not isinstance(get__proper_data_type(line.split(",")[0]), str):
                        # Extract station number string & station data
                        sid, date_val, _dummy, sdata = line.split(",")
                        if sid != snum_str:
                            # This line contains data for the next station,
                            # so push it back to the buffer
                            rc = fb.send(line)
                            # and go to the next file
                            break
                        # Otherwise, append this data
                        sdata = float(sdata) / 10.
                        count[file_id] += 1
                        if date_val in storage:
                            storage[date_val][file_id] = sdata
                        else:
                            storage[date_val]= [sdata, None, None]
                        # station_lines.append(sdata)

            # # Process all the data lines for this station
            # if not station_lines:
            #     # There's no more data to process
            #     break
            print "St# {:6d}/100797. Time: {}. Tx({}), Tn({}), Ta({}) ".\
                format(int(snum_str), time.strftime('%H:%M:%S'), count[0], count[1], count[2])
            # print(station_lines)

            station_info = []
            for key in storage.keys():
                # key_val = storage[key]
                tx, tn, ta = storage[key]
                if ta is None:
                    if tx != None and tn != None:
                        ta = round((tx + tn) / 2., 1)
                if tx is None:
                    if tn != None and ta != None:
                        tx = round(2. * ta - tn, 1)
                if tn is None:
                    if tx != None and ta != None:
                        tn = round(2. * ta - tx, 1)
                # print key,
                py_date = from_excel_ordinal(int(key))
                # print py_date
                station_info.append([snum_str, py_date.year, py_date.month, py_date.day, tx, tn, ta])

            data_out.writerows(station_info)
            del station_info

Thanks for all.

Mohammad ElNesr
  • 2,477
  • 4
  • 27
  • 44
  • `for key in storage.keys():` is inefficient in Python 2. It has to build a list of the dictionary keys before it can start iterating over them. You can iterate directly over the keys using `for key in storage:`. In Python 3 it's ok because `dict.keys()` returns a dynamic View object (which is set-like and which adapts to changes in the underlying dict), not a list, but it's still cleaner to write `for key in storage:`. – PM 2Ring Feb 06 '17 at 14:19
-1

Going with the built-in csv module, you could do something like:

with open(csvfile, 'r') as f:
    reader = csv.reader(f, delimiter=',')
    for i in range(n):
        reader.next()
    for row in reader:
        print row #  Or whatever you want to do here

Where n is the number of lines you want to skip.

Nick H
  • 1,081
  • 8
  • 13