0

I have one process constantly writing and appending to a csv

I want to have a python script tail that csv using pandas to aggregate the data. I will probably to aggregate every 100 lines and every 100 lines take that aggregate data and send it somewhere

Is there a pandas feature for this? Is there a way I can track the line number the python script is processing so if I stop it or if it crashes I can start it up again and it will pickup where it left off?

red888
  • 27,709
  • 55
  • 204
  • 392
  • i don't know of a native feature that does this, but you could just have a log table that records the row number/index in a sql dB or csv file that updates every time you do your write. Similair to a log table in SQL when you are running a stored procedure. – Umar.H Jan 16 '20 at 18:09
  • You just need to keep number of the currently processing line (or batch) in some file on disk. Try writing number of that batch on a file. – nima Jan 16 '20 at 18:13

1 Answers1

1

As mentioned, there's no easy built in way to do this. You can however combine a simple follow function (see How can I tail a log file in Python?) with Pandas to aggregate the dataframe.

We use the follow function to tail the file, appending each new line to a list, which is then converted into a pandas dataframe once we reach a specified number of lines. The list is then reset and we continue following the file. As another commenter mentioned you can write the current line number to disk and read that same file to pick back up where you left off. An example is below.

import pandas as pd
from io import StringIO
import time
import os

def follow(thefile):
    with open(thefile, 'r') as mfile:
        while True:
           line = mfile.readline()
           if not line or not line.endswith('\n'):
               time.sleep(0.1)
               continue
           yield line

if __name__ == "__main__":
    # set the file we want to log the current line to
    log_file = "./current_line"

    # check if the last line processed has been saved
    if os.path.exists(log_file):
        with open(log_file, 'r') as ifile:
            # get the last line processed
            start_line = int(ifile.read())
    else:
        # set the last line processed to be the first data row (not the header).  If there is no header then set to 0
        start_line = 1

    # set the file we are reading
    myfile = 'test.csv'

    # remove this line if you don't need the header
    header = pd.read_csv(myfile, nrows=0)

    # initialize the list to store the lines in
    lines = []

    # loop through each line in the file
    for nline, line in enumerate(follow(myfile)):
        # if we have already processed this file
        if nline < start_line:
            continue
        # append to the lines list
        lines.append(line)

        # check if the we have hit the number of lines we want to handle
        if len(lines) == 100:
            # read the csv from the lines we have processed
            df = pd.read_csv(StringIO(''.join(lines)), header=None)
            # update the header.  Delete this row if there is no header
            df.columns = header.columns

            # do something with df
            print(df)

            # reset the lines list
            lines = []  

            # open the log file and note the line we have processed up to
            with open(log_file, 'w') as lfile:
                lfile.write(str(nline))  # only write the processed lines when we have actually done something
Andrew
  • 693
  • 6
  • 19