1

I am trying to compute the slope of a time series using Pandas with Switfer by doing this.

My code:

import os.path
from os import listdir
from os.path import isfile, join

import numpy as np
import pandas_ta
from scipy.stats import linregress

import pandas as pd
import swifter

FILE_PATH = "Data"


def get_files_ohlc(path: str):
    return [f for f in listdir(path) if isfile(join(path, f))]


def get_slope(array):
    y = np.array(array)
    x = np.arange(len(y))
    slope, intercept, r_value, p_value, std_err = linregress(x, y)
    return slope


def add_slop_indicator(ohlc: pd.DataFrame, ind: str, candles_back: int):
    ohlc[f'slope_{val}_{days}'] = ohlc[ind].swifter.rolling(window=candles_back, min_periods=candles_back).apply(
        get_slope, raw=True)


if __name__ == '__main__':
    files = get_files_ohlc(FILE_PATH)
    dicts = {}

    for file in files:
        name_pair = file.split("_")[0]
        dicts[name_pair] = pd.read_json(os.path.join(FILE_PATH, file))
        dicts[name_pair].rename({0: 'date',
                                 1: 'open',
                                 2: 'high',
                                 3: 'low',
                                 4: 'close',
                                 5: 'volume'}, axis=1, inplace=True)
        dicts[name_pair]['date'] = dicts[name_pair]['date'].values.astype(dtype='datetime64[ms]')
        for val in range(20, 100):
            dicts[name_pair].ta.ema(length=val, append=True)

        print(f"END_{name_pair}")
        for val in range(20, 100):
            for days in range(5, 100):
                add_slop_indicator(dicts[name_pair], f"EMA_{val}", days)
        print(f"DONE {name_pair}")

But after running it for a while I will get this error message and the program will stop running.

Error:

dask.multiprocessing.NotImplementedError: Partition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.

UPDATE:

I've updated my code to make it a simple workable example.

pandas_ta (is used for dicts[name_pair].ta) -> is this library which is going to allow me to use EMA (which is just an indicator)

All the code can be found in this repository that I've created for this question. If you will run the code from test.py it will output that error message that I am asking about.

Hope that this would help, otherwise please let me know what would needs to be clarified.

My traceback:

Traceback (most recent call last):
  File "/home/vlad/Crypto_15m_data/test.py", line 51, in <module>
    add_slop_indicator(dicts[name_pair], f"EMA_{val}", days)
  File "/home/vlad/Crypto_15m_data/test.py", line 27, in add_slop_indicator
    ohlc[f'slope_{val}_{days}'] = ohlc[ind].swifter.rolling(window=candles_back, min_periods=candles_back).apply(
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/swifter/swifter.py", line 521, in apply
    return self._dask_apply(func, *args, **kwds)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/swifter/swifter.py", line 562, in _dask_apply
    dd.from_pandas(self._comparison_pd, npartitions=self._npartitions)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/base.py", line 292, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/base.py", line 575, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/multiprocessing.py", line 220, in get
    result = get_async(
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/local.py", line 508, in get_async
    raise_exception(exc, tb)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/multiprocessing.py", line 110, in reraise
    raise exc
dask.multiprocessing.NotImplementedError: Partition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.

Traceback
---------
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/local.py", line 221, in execute_task
    result = _execute_task(task, data)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/dataframe/rolling.py", line 29, in overlap_chunk
    raise NotImplementedError(msg)


Process finished with exit code 1
Mircea
  • 1,671
  • 7
  • 25
  • 41
  • Please include a full [mre] including the definition of `dataframe`. You don’t even specify what dataframe is, but it seems it’s a dask.dataframe? – Michael Delgado Apr 17 '22 at 05:47
  • And what is dataframe.ta? – Michael Delgado Apr 17 '22 at 05:49
  • @MichaelDelgado I did update my question with all the needed details and I've also created a small repository with a workable example. Hope that it would help to make my problem clear. Let me know if I need to add anything else. – Mircea Apr 17 '22 at 17:19
  • got it thanks for the edits! one more nitpick - is that a warning message and then your program crashes elsewhere or is that just the last line of a [traceback](//realpython.com/python-traceback)? Either way, please always post the full traceback when asking about errors. But since you're not initializing a dask.dataframe anywhere I assume this is either a bug in pandas_ta or swifter, or you're calling one of them incorrectly. I don't have any experience with those packages - sorry. This is probably a q for the pandas_ta or swifter user groups. – Michael Delgado Apr 17 '22 at 17:29
  • I see... ok I will post over there then, thx for you help :) – Mircea Apr 17 '22 at 17:35
  • for sure - and FWIW I think this is a totally fair SO question and with the added info it's enough for us to go on, but I just think you might have more luck posting directly to swifter because it seems like it's an issue with the way it is using dask.dataframe under the hood. good luck! – Michael Delgado Apr 17 '22 at 20:41

0 Answers0