0

I have a 20 GB trades.csv file. It has two columns (trade_time and price). And the csv file contains 650 million rows.

Sample Data

https://gist.github.com/dsstex/bc885ed04a6de98afc7102ed08b78608

Pandas DataFrame

df = pd.read_csv("trades.csv", index_col=0, parse_dates=True)

I would like to check whether price is up or down based on percentage. If the price hits up_value (e.g. +1%) first, the result is 1. If the price hits down_value (e.g. -0.5%) first, then the result is 0. I need to do this for all 650 million rows.

At the moment, the dataframe has only two columns. trade_time(index), price. I would like to have a new column named "result".

import pandas as pd

df = pd.read_csv("trades.csv", index_col=0, parse_dates=True)
df["result"] = None

print(df)

up_percentage = 0.2
down_percentage = 0.1


def calc_value_from_percentage(percentage, whole):
    return (percentage / 100) * whole


def set_result(index):

    up_value = 0
    down_value = 0

    for _, current_row_price, _ in df.loc[index:].itertuples():
        if up_value == 0 or down_value == 0:

            up_delta = calc_value_from_percentage(up_percentage, current_row_price)
            down_delta = calc_value_from_percentage(down_percentage, current_row_price)

            up_value = current_row_price + up_delta
            down_value = current_row_price - down_delta

        if current_row_price > up_value:
            df.loc[index, "result"] = 1
            return

        if current_row_price < down_value:
            df.loc[index, "result"] = 0
            return


for ind, _, _ in df.itertuples():
    set_result(ind)

df.to_csv("results.csv", index=True, header=True)
print(df)

Results

https://gist.github.com/dsstex/fe3759beedbf9c46ace382a7eef3d12c

Note: Due to insufficient data, most of the bottom rows in the above file has the value "None" for "result". So the value is blank.


At the moment, I'm using pandas itertuples() to process the file. I would like to have a vectorized solution since I have a huge file.

Note: Last month I asked this question. This is a follow-up question. And it is related to this answer. In that answer, the author is using a fixed size up_value/down_value of 200. But I'm after a percentage based vectorized solution.

Any help is greatly appreciated.

Thanks

John
  • 129
  • 12
  • Just a question: `up_value` is equal to `current_row_price + up_delta`, but then you check `if current_row_price > up_value`, thus `if current_row_price > current_row_price + up_delta`, or `if up_delta < 0`, that however should result always in positive values. What's your goal? – crissal Jun 17 '22 at 17:51
  • @crissal Apologies for not providing enough info. I'm setting `up_value` only when `up_value=0`. Then I'm doing a for loop until the loop hit break statement. – John Jun 17 '22 at 17:54
  • @crissal Updated the `Result Checking` part in my question. It now contains the loop. – John Jun 17 '22 at 17:59
  • Can you please provide a [minimal and reproducible example](https://stackoverflow.com/help/minimal-reproducible-example)? Where are you doing your `value calculation` referring to your loop in `result checking`? – crissal Jun 17 '22 at 18:52
  • @crissal Updated my question with fully working code, sample data and sample result. Thanks. – John Jun 18 '22 at 04:11

2 Answers2

1

Reading your full code I finally understood your algorithm.

For every index of the dataframe, you have to compute if the "result" is 1 or 0, so that:

  • 1 means that I find in the dataframe another price, current_row_price in the loop, that is greater than my original price - for current index, computed in the if block - by a up_delta value;
  • 0 means that I find another price in the df that is lower than my original price by a down_delta value.

I came up with this code. Maybe the loop is avoidable, but this should be faster.

from enum import Enum

import pandas as pd


class Result(int, Enum):
    DOWN = 0
    UP = 1
    MISSING = 2


df = pd.read_csv("trades.csv", index_col=0, parse_dates=True)
df["result"] = Result.MISSING

# constants 
up_percentage = 0.2
down_percentage = 0.1

# compute upper and lower bound for every row
df["upper_bound"] = df["price"] * (1 + up_percentage / 100)
df["lower_bound"] = df["price"] * (1 - down_percentage / 100)

# for each row get current upper and lower bounds, and check 
# in all dataframe if any row is greater/lower than these values
for i, row in df.iterrows():
    series_up: pd.Series = pd.Series(df["price"].loc[i:] > row["upper_bound"])
    series_up_index = series_up[series_up].index
    series_up_min = series_up_index.min()

    series_down: pd.Series = pd.Series(df["price"].loc[i:] < row["lower_bound"])
    series_down_index = series_down[series_down].index
    series_down_min = series_down_index.min()

    is_up_hit = bool(series_up_min) and not pd.isna(series_up_min)
    is_down_hit = bool(series_down_min) and not pd.isna(series_down_min)

    if is_up_hit and is_down_hit:
        if series_up_min < series_down_min:
            result = Result.UP
        else:
            result = Result.DOWN
    elif is_up_hit:
        result = Result.UP
    elif is_down_hit:
        result = Result.DOWN
    else:
        result = Result.MISSING

    df.loc[i, "result"] = result


# remove utility columns
df.drop(columns=["upper_bound", "lower_bound"], inplace=True)

# store result
df.to_csv("results.csv", index=True, header=True)
crissal
  • 2,547
  • 7
  • 25
  • Hi, thanks for the code. I'm gonna test it. I'll get back to you with results. Thanks. – John Jun 18 '22 at 12:58
  • Just tested your code. It doesn't produce the exact results as this one. https://gist.github.com/dsstex/fe3759beedbf9c46ace382a7eef3d12c – John Jun 18 '22 at 13:07
  • See lines 5562 and 5563. https://gist.github.com/dsstex/fe3759beedbf9c46ace382a7eef3d12c#file-results-csv-L5562 – John Jun 18 '22 at 13:07
  • Your code supposed to print `result 2` for 5562 and `result 0` for 5563. But it prints `result 1` for both. – John Jun 18 '22 at 13:09
  • `df.result.value_counts()` for my original algorithm results. `0=>4433, 2=>1137, 1=>516`. `df.result.value_counts()` for the results produced by your algorithm. `1=>4167, 0=>1919`. Hope that helps. – John Jun 18 '22 at 13:18
  • @John you're right, because I was checking all over `df`. Adding `.loc[i:]` the behaviour is similiar to yours, i.e. only rows greater than current - in terms of index - are considered. This should fix the issue, and also `df.result.value_counts()` yield the same results for both dataframes. – crissal Jun 18 '22 at 14:33
  • Brilliant!!! I just did `from pandas.util.testing import assert_frame_equal` and then `assert_frame_equal(df, df2)`. I can confirm both data is equal. However I have few questions though. In your comment, you have used like this. `# priority over up_result, as in your original code`. I do not care about the priority. I care about which way price hits first. I mean if the price hits `down_percentage` first, then I want the result to be `0`. So is priority really matters here? – John Jun 18 '22 at 14:45
  • Yes it does. Imagine that, for the index `i`, you got an up value equals to `x` and a down value equals to `y`. Now, your original algorithm was saying: "for each row after `i`, look at its price: if it's greater than `x`, then set `result` to `1`; if it's lesser than `y`, set the `result` to `0`." The problem with my "parallel" code, however, is that both `up_result` and `down_result` can be `True`, so a priority must be set between them, because we must set `1` or `0`; in this case, following your code, the `up` priority is set. – crissal Jun 18 '22 at 20:36
  • Then I'm gonna have serious issues with your solution. In my original code, the `current_row_price` is either going to hit `up_value` or `down_value` since it is inside a loop. But you are saying both up_result and down_result can be True in your solution. Is there a way, we can use `index` as a priority? – John Jun 18 '22 at 20:40
  • What I mean is, if up_result index is 553, but down_result index is 552, that means 552 is the correct result. – John Jun 18 '22 at 20:43
  • 1
    @John yeah I understand, and you're right, in your example the result would be wrong. I modified the answer, probably *now* is the correct answer you were looking for – crissal Jun 18 '22 at 21:07
  • 1
    Thanks very much crissal. Your code is really helpful. Just tested your code with 200,000 rows. It took 6 minutes to process it. That's 30 minutes for each 1 million records. For 650 million rows, I need 2 weeks to process it. I think I have to use your solution until I find a more faster solution. Thanks. – John Jun 18 '22 at 21:23
  • 1
    Note: I modified your code slightly like this before performing my tests. I use itertuples instead of iterrows since iterrows is slow. `for i, _, _, upper_bound, lower_bound in df.itertuples():` See here: https://stackoverflow.com/a/24871316/18201044 – John Jun 18 '22 at 21:29
  • 1
    I didn't know about this, nice to learn something new every day :) btw the code in the loop can be easily assigned to a threadpool – crissal Jun 18 '22 at 21:32
  • 1
    I'm glad I can be of some help. You would be very surprised to know how much time it takes to process 100 million records via a vectorized solution. See the summary at the bottom of this page. https://python.plainenglish.io/pandas-how-you-can-speed-up-50x-using-vectorized-operations-d6e829317f30 – John Jun 18 '22 at 21:37
  • Good luck. Just in case you need. https://pythonspeed.com/articles/vectorization-python/ – John Jun 18 '22 at 21:42
1

The original algorithm is super slow because it is doing a nested loop with iterrows/tuples.

If I understood good, for each row, you check if any of the posterior rows reach to the "fixed" percentage. If it reaches up, you tag as 1, if it reaches down you tag 0, otherwise it is not tagged (None)

I reached to this code. It is not vectorized, but it runs in my machine much faster than the initial question and the accepted solution.

It could be that with 650M rows it becomes slower though.

import pandas as pd
import numpy as np

from time import time

df = pd.read_csv("trades.csv", index_col=0, parse_dates=True)
t0=time()

up_percentage = 0.2
down_percentage = 0.1

# Precalculate the percentages
df['upper'] = df['price']*(1+up_percentage/100)
df['lower'] = df['price']*(1-down_percentage/100)

pupper = np.array([np.argmax(df.price.values[n:] > up_value)   for n,up_value   in enumerate(df.upper)])-1
plower = np.array([np.argmax(df.price.values[n:] < down_value) for n,down_value in enumerate(df.lower)])-1

df["result"] = None
# These two cases occur when the index is not found, but no need to re-set to None. 
# df.loc[pupper<0,'result']=None
# df.loc[plower<0,'result']=None
# If the upper value is found and it occurs before the lower, set it to 1
df.loc[(pupper>0)&((plower<0)|(pupper<plower)),'result']=1
# If the upper value is found and it occurs before the lower, set it to 1
df.loc[(pupper<0)&(plower>0),'result']=0

print(f"{1000*(time()-t0):0.2f}ms")

Benchmark: counting only the time to perform the operations, not loading/saving the CSV.

  • Original: 19s
  • Crissal's: 6537ms
  • This code: 135ms

Checked for equality running original code + proposed code as df2 and comparing:

df3 = df.merge(df2, left_index=True, right_index=True).fillna('empty')
print(f"The result columns are equal: {(df3.result_x==df3.result_y).all()}")
Zaero Divide
  • 699
  • 2
  • 10