2

I have a number of huge csv files (20GB ish) that I need to read, process then write the processed file back to a new csv.

The head of the csv file looks like this: enter image description here

The object of the task is to read the csv file and to compare the time from each line to see if it's within start and end times contained within a dictionary. If it isn't then the line is skipped, if it is then it is written to the new file.

Sounds very easy but due to the size being efficient is crucial and I need some advice.

I've tried a number of methods including trying to read the whole file in pandas which was taking a long time or crashing due to memory issues. I also tried opening and reading the file line by line then processing it but this also seemed to take a long time. My now line of attack is using dask but before I carry on I wanted to see if anyone can give me any hints as to improving speed on:

  1. Reading
  2. processing - this seems to take a long time as I'm using the apply function in dask to apply the processing function to every row. When I tried this it was taking something like 3 hours to process one file.
  3. Writing - Seemed to take a long time to write a single csv file of say 20GB. I tried using dask to output each partition then combining the partitions into one file which did seem to improve speed a bit.

What would be the plan of attack each stage above to produce the fastest results bearing in mind I have 16GB Ram?

This is the code that I came up with after reading your responses. I incorporated a bit of code to display the percentage complete. It appears that the % complete seems to slow down a lot when the file progresses, as if there is a memory issue or another problem. For a 20GB it was taking over 4 hours to process, which seems a very long time. I looked at the system resources during processing and CPU was sitting at about 16% and memory about 10GB of 16GB. Can someone tell me if this is the sort of speed I can expect or is there something wrong with the code?


from datetime import datetime, timedelta, timezone
from symbol_specification import darwinex_quote_hours, darwinex_symbol_specs
import os
import csv
import pandas as pd
import io
import tailer

# Software Variables and Information
__author__ = 'TBC'
__copyright__ = 'TBC'
__credits__ = ['TBC']
__license__ = 'GPL'
__version__ = '1.0.4dev'
__maintainer__ = 'TBC'
__email__ = 'TBC'
__status__ = 'Production'
software_name = 'TBC'


def get_digits(str_val):
    return len(str_val.split('.')[-1])


def get_in_filepath(dir_name, filename):
    return dir_name + r"\\" + filename


def get_out_filepath(dir_name, filename):
    return dir_name + r"\\" + filename.split('.csv')[0] + '_OOH_removed.csv'


def get_input_variables():

    dir_name = r"X:"

    # dictionary containing darwinex symbol id and csv filename
    run_details = {'CADCHF': 'CADCHF_mt5_ticks.csv',
                   'CADJPY': 'CADJPY_mt5_ticks.csv',
                   'CHFJPY': 'CHFJPY_mt5_ticks.csv',
                   'EURAUD': 'EURAUD_mt5_ticks.csv',
                   'EURCAD': 'EURCAD_mt5_ticks.csv',
                   'EURCHF': 'EURCHF_mt5_ticks.csv',
                   'EURGBP': 'EURGBP_mt5_ticks.csv',
                   'EURJPY': 'EURJPY_mt5_ticks.csv',
                   'EURNZD': 'EURNZD_mt5_ticks.csv'}

    # remove out of hours ticks?
    remove_out_of_hours_ticks = True

    # round OHLC values?
    round_ohlc_required = False

    # remove trailing zeros - ** Saves sapce in the output file
    remove_trailing_zeros = True

    # get quote hours as shown in MT5 symbol specification
    quote_hours = darwinex_quote_hours

    # specify the broker timezone in hours
    # ****************************IMPORTANT NOTE******************************
    # The Quote hours specified above relate to the broker time zone which maybe
    # different to the timezone tha the tickstory data was downloaded in.
    # Following my data download guidelines the tickstory data should be downloaded in
    # UTC+0):00 Dublin, Edinburgh, Lisbon, London timezone WITH DST. The Quote
    # hours specified in the MT5 specification are in the broker server time
    # which could be different. For example Darwinex is UTC +2.
    # Therefore the broker time offset is +2. The code will then subtract 2
    # hours to any of the quote times before using them.
    # ************************************************************************
    broker_time_offset = 2

    # create input dictionary
    input_vars = {
        'dir_name': dir_name,
        'run_details': run_details,
        'remove_out_of_hours_ticks': remove_out_of_hours_ticks,
        'remove_trailing_zeros': remove_trailing_zeros,
        'quote_hours': quote_hours,
        'quote_broker_time_offset': broker_time_offset,
        'round_ohlc_required': round_ohlc_required,
    }
    return input_vars


def round_ohlc(line, digits, input_vars):
    # assign vals
    date = line[0]
    time = line[1]
    bid = line[2]
    ask = line[3]
    last = line[4]
    vol = line[5]

    if digits != 0:
        bid = round(float(bid), digits)
        ask = round(float(ask), digits)
        last = round(float(last), digits)
    else:
        bid = int(round(float(bid), digits))
        ask = int(round(float(ask), digits))
        last = int(round(float(last), digits))

    # assemble line
    if input_vars['remove_trailing_zeros']:
        line = [date, time, f'{bid:.{digits}f}'.rstrip('0').rstrip('.'),
                f'{ask:.{digits}f}'.rstrip('0').rstrip('.'),
                f'{last:.{digits}f}'.rstrip('0').rstrip('.'), vol]
    else:
        line = [date, time, f'{bid:.{digits}f}', f'{ask:.{digits}f}',
                f'{last:.{digits}f}', vol]

    return line


def get_weekday_string(day_digit):
    weekdays = {0: 'Mon',
                1: 'Tues',
                2: 'Wed',
                3: 'Thurs',
                4: 'Fri',
                5: 'Sat',
                6: 'Sun'}

    return weekdays[day_digit]


def get_weekday_string(day_digit):
    weekdays = {0: 'Mon',
                1: 'Tues',
                2: 'Wed',
                3: 'Thurs',
                4: 'Fri',
                5: 'Sat',
                6: 'Sun'}

    return weekdays[day_digit]


def remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
    # get quote offset
    quote_offset = input_vars['quote_broker_time_offset']

    # adjust tick_datetime for offset
    tick_datetime_adj = tick_datetime + timedelta(hours=quote_offset)

    # get quote hours
    day_string = get_weekday_string(tick_datetime_adj.weekday())
    quote_hours = symbol_quote_hours[day_string]

    # initialse keep tick to False (remove)
    remove_tick = True

    # iterate through all quote start/end pairs and check to see if tick is in hours
    for idx in range(len(quote_hours['start'])):
        tick_time = tick_datetime_adj

        # get date of tick
        tick_date = tick_time.date()

        # form quote hours start time
        start_time = datetime.strptime(quote_hours['start'][idx], '%H:%M').time()

        # combine the date and quote start time to form datetime
        start = datetime.combine(tick_date, start_time)

        if quote_hours['end'][idx] == '24:00':
            # special case. 24:00 means to the end of the day but it's not
            # recognised by python as a valid datetime. To get around this the
            # day is incremented by 1 and a time of 00:00 used which is equivalent.

            # form quote hours end time
            end_time = datetime.strptime('00:00', '%H:%M').time()

            # combine the date and quote end time to form datetime
            end = datetime.combine(tick_date + timedelta(days=1), end_time)
        else:
            # form quote hours end time
            end_time = datetime.strptime(quote_hours['end'][idx], '%H:%M').time()

            # combine the date and quote end time to form datetime
            end = datetime.combine(tick_date, end_time)

        # check to see if tick is within quote hours
        if start <= tick_time <= end:
            remove_tick = False

    return remove_tick


def write_conversion_log(input_filename, output_filename,
                         mod_string, conversion_time, input_vars):
    # conversion log file name
    working_dir = input_vars['dir_name']
    filename = working_dir + '//tickstory_tick_pre_processor_conversions.log'

    # determine if file exists or not and assign appropriate opening flag
    if os.path.exists(filename):
        append_write = 'a'  # append if already exists
    else:
        append_write = 'w'  # make a new file if not

    with open(filename, append_write) as outfile:
        outfile.write('--------------------------------------------------------'
                      '-------------------------\n')
        outfile.write('Conversion Details\n')
        outfile.write(f'Software Name: {software_name}\n')
        outfile.write(f'Software Version: {__version__}\n')
        outfile.write(f'Date/Time: {datetime.now(timezone.utc)}\n')
        outfile.write(f'Input file = {input_filename}\n')
        outfile.write(f'{mod_string}\n')
        outfile.write(f'Output file = {output_filename}\n')
        outfile.write(f'Conversion Duration: {conversion_time}')
        outfile.write('--------------------------------------------------------'
                      '-------------------------')


def get_start_end_date(filename):
    # Get the first 3 rows of the file
    df_start = pd.read_csv(filepath_or_buffer=filename,
                           encoding='utf8',
                           nrows=3)

    # Add column names
    df_start.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']

    # create Datetime column
    df_start['Datetime'] = df_start['Date'].astype(str) + ' ' + df_start['Time'].astype(str)
    df_start['Datetime'] = pd.to_datetime(df_start['Datetime'], format='%Y%m%d %H:%M:%S')

    # Get the last 3 rows of the file
    with open(filename, 'r', encoding='utf8') as file:
        last_lines = tailer.tail(file, 3)

    # clean up last line for line feed carriage returns etc by checking if line has a string
    while last_lines[-1] == '':
        last_lines = last_lines[:-1]

    df_end = pd.read_csv(io.StringIO('\n'.join(last_lines[1:])), header=None)

    # Add column names
    df_end.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']

    # create Datetime column
    df_end['Datetime'] = df_end['Date'].astype(str) + ' ' + df_end['Time'].astype(str)
    df_end['Datetime'] = pd.to_datetime(df_end['Datetime'], format='%Y%m%d %H:%M:%S')

    # Add Start and End time to Symbol Info dictionary
    start_date = df_start['Datetime'][0]
    end_date = df_end['Datetime'][0]

    return start_date, end_date


def get_percentage_complete(start, end, tick_datetime):
    total_period = end - start
    period_complete = tick_datetime - start
    pct_complete = (period_complete / total_period) * 100

    return pct_complete


def main():
    # get input variables
    input_vars = get_input_variables()

    for darwinex_id, filename in input_vars['run_details'].items():

        # get filenames
        input_filename = get_in_filepath(input_vars['dir_name'], filename)
        output_filename = get_out_filepath(input_vars['dir_name'], filename)

        # get the start and end dates of the data so % copmlete can be determined
        start_date, end_date = get_start_end_date(input_filename)

        # get symbol quote hours
        symbol_quote_hours = darwinex_quote_hours[darwinex_id]

        # initialse list
        temp_list = []

        # read csv
        before_process = datetime.now()

        # initialise counters and mod string
        ticks_removed_count = 0
        mod_string = ''
        file_converted = False
        percentage_complete = 0

        # if processing requried open input and ouput files and process as requried
        if input_vars['remove_out_of_hours_ticks'] or input_vars['round_ohlc_required']:
            file_converted = True
            with open(input_filename, 'r', newline='') as f_in:
                with open(output_filename, 'w', newline='') as f_out:
                    # set up reader ad writer buffers
                    reader = csv.reader(f_in)
                    writer = csv.writer(f_out)
                    # for each line check whether datetime is within hours. If it is keep line, if not skip
                    for idx, line in enumerate(reader):
                        tick_datetime = datetime.strptime(line[0] + ' ' + line[1], '%Y%m%d %H:%M:%S')
                        if not remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
                            # keep line
                            # convert OHLC values if required
                            if input_vars['round_ohlc_required']:
                                digits_round = darwinex_symbol_specs[darwinex_id]['digits']
                                line = round_ohlc(line, digits_round, input_vars)
                            # write line to new file
                            writer.writerow(line)
                        else:
                            ticks_removed_count += 1
                        # determine and output the % complete after every 1000 lines
                        if idx % 1000 == 0:
                            percentage_complete_new = get_percentage_complete(start_date, end_date, tick_datetime)
                            if int(percentage_complete_new) - int(percentage_complete):
                                percentage_complete = int(percentage_complete_new)
                                print(f'{input_filename} % Complete: {percentage_complete:.0f}%', end='\r')

        # calculate conversion time
        after_process = datetime.now()
        conversion_time = after_process - before_process

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update tick removal modification string
        if input_vars['remove_out_of_hours_ticks']:
            mod_string = mod_string + (f'{newline}***Tick Removal***\n'
                                       f'{ticks_removed_count} ticks have been removed')
        else:
            mod_string = mod_string + f'{newline}***Tick removal NOT requested***'

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update rounding modification string
        if input_vars['round_ohlc_required']:
            mod_string = mod_string + (f'{newline}***OHLC values converted***\n'
                                       f'OHLC values rounded to {digits_round} digits')
            if input_vars['remove_trailing_zeros']:
                mod_string = mod_string + (f'\nOHLC values trailing zeros removed')
        else:
            mod_string = mod_string + (f'{newline}***OHLC value conversion NOT requested***')

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update case when no conversion is specified
        if not input_vars['remove_out_of_hours_ticks'] and not input_vars['round_ohlc_required']:
            mod_string = f'No Conversion performed as not requested in input parameters'

        # write conversion details to log file
        if file_converted:
            write_conversion_log(input_filename, output_filename,
                                 mod_string, conversion_time, input_vars)

            # print conversion details to the terminal
            print('----------------------------------------------------------------')
            print(f'Software Name: {software_name}')
            print(f'Software Version: {__version__}')
            print(f'Date/Time: {datetime.now(timezone.utc)}')
            print('Conversion Details')
            print(f'input file = {input_filename}')
            print(mod_string)
            print(f'output file = {output_filename}')
            print(f'Conversion Duration: {conversion_time}')
            print('----------------------------------------------------------------')
        else:
            print(f'input file = {input_filename} - {mod_string}')


if __name__ == '__main__':
    main()

machTucker
  • 55
  • 4
  • 3
    Don't use Pandas or Dask for a streaming task like this, the built-in `csv` module will do absolutely fine for both reading and writing. – AKX Feb 10 '23 at 15:49
  • 1
    Also, please (a) post your (example) data as text (b) and post some code too... – AKX Feb 10 '23 at 15:50

2 Answers2

3

A basic pattern for filtering a CSV looks like:

import csv

f_in = open("input.csv", newline="")
f_out = open("output.csv", "w", newline="")

reader = csv.reader(f_in)
writer = csv.writer(f_out)

writer.writerow(next(reader))  # transfer header, if you have one

for row in reader:
    if meets_my_condition(row):
        writer.writerow(row)

f_in.close()
f_out.close()

For simple row filtering, this is as fast and memory efficient as you can do it in Python: the reader is iterating a row at a time, so no more than one row in memory at a time; the file reads/writes are buffered, so the IO bottleneck is as low as your system and Python will allow. Any other framework/lib—Dask, etc...—will probably impose some performance overhead compared to this; and never use Pandas to iterate rows.

Based on my understanding of your screenshot and description, meets_my_condition needs to check if the value in column 2 is within a range of times; that could look like:

def meets_my_condition(row: list[str]) -> bool:
    return row[1] >= "00:00:00" and row[1] <= "12:00:00"

Given this input CSV:

Col1,Col2
a,01:00
b,10:00
c,13:00
d,07:00
e,23:00
f,00:00

I get this output CSV:

Col1,Col2
a,01:00
b,10:00
d,07:00
f,00:00

I've spec'd operations like this before, and memory barely exceeds Python's minimum footprint of 7-8MB on my laptop, so with 16GB you'll have more than enough headroom.

Zach Young
  • 10,137
  • 4
  • 32
  • 53
  • Nice answer. The csv module is very useful! – ktm5124 Feb 10 '23 at 16:46
  • Thanks very much, I will look into implementing something like this. I wasn't aware of the csv module. – machTucker Feb 13 '23 at 16:32
  • @machTucker, did this end up working for you? – Zach Young Feb 16 '23 at 18:13
  • @ZachYoung I've edited my question to include the code I've implemented. It was taking over 4 hours to convert a 20GB file. Not sure if this is typical? Seems like a long time to me. – machTucker Feb 19 '23 at 16:20
  • Hi @machTucker, that's a really big program :) I'm trying to find a way to help you focus on the issue of this post, how to read/write a 20GB file... I'm guessing you're trying to mix-in my solution with what you already had, and I don't think that's going to work for you. In principal, I suggest you try to step back and focus on the solution as I presented it. Practically, remove any reference to Pandas, I see you're reading the entire file with Pandas in `get_start_end_date(fname)`... that's going to be incredibly SLOW just from a processing perspective, and will break your RAM budget. – Zach Young Feb 19 '23 at 19:19
  • Here is some more practical advice: 1) take my ~20 lines and run it on one file and see how long it takes... ignore the bigger picture of _the entire problem_ you need to solve, and make sure you're answering the question, "what's the quickest I can and read and write a 20GB CSV?", then, start layering in all the other things 2) mock up some files of varying sizes, take a single a row and write a small helper program that copies that row N-number times so you can get smaller files to try on before potentially committing (potentially) hours to reading the 20GB file. – Zach Young Feb 19 '23 at 19:25
  • **I believe** if you're reading and writing the CSV correctly, as I've laid, it should only be a matter of minutes, assuming your hardware is newer than 2010. – Zach Young Feb 19 '23 at 19:26
  • 1
    @ZachYoung. Thanks, I will break the problem down tomorrow and report back, using your code as the basis so I can use it as a benchmark. Regarding the use of pandas I'm only using that to load the first 3 lines of the file so that I can get the start time. It get's past that operation fairly quickly I think. Anyway I'll report back tomorrow. – machTucker Feb 19 '23 at 22:18
  • @machTucker, ah, I missed the nrows=3 param. – Zach Young Feb 20 '23 at 01:59
  • If I want to post a follow up to my tests, where do I post. I'd like to post the code I've used for the tests? – machTucker Feb 20 '23 at 15:25
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/252004/discussion-between-zach-young-and-machtucker). – Zach Young Feb 20 '23 at 15:34
1

I am not sure for the specific details of your task, but it seems fairly simple to me. Since you haven't provided an MRE I will try with a pseudocode-like:

# Let input_filename be your 20gb .csv file
# Let output_filename be your filtered output file
with open(input_filename, 'r') as f_in, open(output_filename, 'w') as f_out:
    for line in f_in:
        # Abstract: with extract_time() you get the time information with any way
        line_time = extract_time(line)

        # Abstract: with satisfies_constraint you check the aforementioned dictionary
        if satisfies_constraint(dct, line_time):
            f_out.write(line)

This way you read the input file line-by-line in O(N), checking the dictionary in O(1) and you don't load the entire file in memory.

What is the issue with this simple way?

Update:

I updated my initial answer to be algorithmic-enough and not focusing on the exact way you will implement the individual aspects of whether you will split the line or read it with csv, etc. IMO the question concerns the basic procedure of completing the task.

lezaf
  • 482
  • 2
  • 10
  • @SergeBallesta I'm not sure that this kind of optimizations are the issue here. machTucker talked about trying to read *whole* file in pandas so I suppose the question is in a far more fundamental spectrum. However, I will edit my code to be even more pseudocode-like. – lezaf Feb 10 '23 at 16:26
  • 1
    Your edited post is IMHO much better because it really focuses on the major point. Deserves a +1... – Serge Ballesta Feb 10 '23 at 16:30
  • Manually splitting is only slightly more performant than using the csv module, and the csv module will be 100% correct... CSV files can look deceptively simple, but the standard is quite complex with lots of rules: the csv module gets that all right, and is so fast/performant to try and do otherwise is setting yourself up for trouble, at least down the road. – Zach Young Feb 10 '23 at 16:33
  • @ZachYoung as I already said I think that the question is not implementation-specific but more for the algorithmic nature of what is efficient to do and to do not (like trying to load a 20gb file in memory). – lezaf Feb 10 '23 at 16:42
  • I almost exclusively answer Python-CSV questions here on SO and this kind of question gets asked a lot. Since I've yet to see any viable competitor to the csv module for performance and correctness, there's no "implementation agnostic" answer for me: the answer always starts with the csv module, then follows on from that. And I've also seen enough people trying to figure how to parse a CSV file with regex or some other way besides using the 100%-correct csv module, that anything other than showing the csv module is a non-starter for me. – Zach Young Feb 10 '23 at 16:49
  • 1
    I (think) I have tried implementing something like this and it was taking in the order of 3 hours. I will compare what you have to what I wrote. Many thanks for your thoughts. – machTucker Feb 13 '23 at 16:33