1

I have a polars dataframe as follows:

df = pl.DataFrame(
    dict(
        day=[1, 1, 1, 3, 3, 3, 5, 5, 8, 8, 9, 9, 9],
        value=[1, 2, 2, 3, 5, 2, 1, 2, 7, 3, 5, 3, 4],
    )
)

I want to incrementally rotate the values in column 'day'? By incremental rotation, I mean for each value, change it to its next larger value exists in the column, and if the value is the largest, then change it to null/None.

Basically, the result I expect should be the following:

pl.DataFrame(
    dict(
        day=[3, 3, 3, 5, 5, 5, 8, 8, 9, 9, None, None, None],
        value=[1, 2, 2, 3, 5, 2, 1, 2, 7, 3, 5, 3, 4],
    )
)

Is there some particular polars-python idiomatic way to achieve this?

lebesgue
  • 837
  • 4
  • 13

2 Answers2

3

If day is sorted - you could group together - shift - then explode back?

(df.groupby("day", maintain_order=True)
   .agg_list()  
   .with_columns(pl.col("day").shift(-1))
   .explode(pl.exclude("day")))
shape: (13, 2)
┌──────┬───────┐
│ day  | value │
│ ---  | ---   │
│ i64  | i64   │
╞══════╪═══════╡
│ 3    | 1     │
│ 3    | 2     │
│ 3    | 2     │
│ 5    | 3     │
│ 5    | 5     │
│ 5    | 2     │
│ 8    | 1     │
│ 8    | 2     │
│ 9    | 7     │
│ 9    | 3     │
│ null | 5     │
│ null | 3     │
│ null | 4     │
└──────┴───────┘

Perhaps another approach is to .rank() the column.

.search_sorted() for rank + 1 could find the positions of the next "group".

The max values could be nulled out then passed to .take() to get the new values.

(df.with_columns(
   pl.col("day").rank("dense")
     .cast(pl.Int64)
     .alias("rank"))
.with_columns(   
   pl.col("rank")
     .search_sorted(pl.col("rank") + 1)
     .alias("idx"))
.with_columns(
   pl.when(pl.col("idx") != pl.col("idx").max())
     .then(pl.col("idx"))
     .alias("idx"))
.with_columns(
   pl.col("day").take(pl.col("idx"))
     .alias("new"))
)
shape: (13, 5)
┌─────┬───────┬──────┬──────┬──────┐
│ day | value | rank | idx  | new  │
│ --- | ---   | ---  | ---  | ---  │
│ i64 | i64   | i64  | u32  | i64  │
╞═════╪═══════╪══════╪══════╪══════╡
│ 1   | 1     | 1    | 3    | 3    │
│ 1   | 2     | 1    | 3    | 3    │
│ 1   | 2     | 1    | 3    | 3    │
│ 3   | 3     | 2    | 6    | 5    │
│ 3   | 5     | 2    | 6    | 5    │
│ 3   | 2     | 2    | 6    | 5    │
│ 5   | 1     | 3    | 8    | 8    │
│ 5   | 2     | 3    | 8    | 8    │
│ 8   | 7     | 4    | 10   | 9    │
│ 8   | 3     | 4    | 10   | 9    │
│ 9   | 5     | 5    | null | null │
│ 9   | 3     | 5    | null | null │
│ 9   | 4     | 5    | null | null │
└─────┴───────┴──────┴──────┴──────┘

Feels like I'm missing an obvious simpler approach here..

jqurious
  • 9,953
  • 1
  • 4
  • 14
  • I posted an alternative with some benchmarking, but I've since deleted the post because it's not clear if we can assume that the input dataset is already sorted by `day`. (It makes a huge difference if we cannot assume that the dataset is sorted). Sorry about that... – ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ Jan 28 '23 at 16:57
  • 1
    @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ Perhaps you could make it visible again for learning purposes? Your answers have been extremely educational. I'm not sure if you saw the previous timeseries question: https://stackoverflow.com/questions/75221602/nested-time-based-groupby-operations-sub-groups-without-apply - I feel like I'm missing something in that approach also. – jqurious Jan 28 '23 at 17:20
3

@jqurious, what I'd recommend for remapping values is a join. Joins are heavily optimized and scale very well, especially on machines with a good number of cores.

As an example, let's benchmark some solutions.

First, some data

Let's use enough data to avoid spurious results from "microbenchmarking" using tiny datasets. (I see this all too often - tiny datasets with benchmark results down to a few microseconds or milliseconds.)

On my 32-core system with 512 GB of RAM, that means expanding the dataset to one billion records. (Choose a different value below as appropriate for your computing platform.)

import polars as pl
import numpy as np
import time

rng = np.random.default_rng(1)

nbr_rows = 1_000_000_000
df = pl.DataFrame(
    dict(
        day=rng.integers(1, 1_000_000, nbr_rows),
        value=rng.integers(1, 1_000_000, nbr_rows),
    )
).with_row_count()
df
shape: (1000000000, 3)
┌───────────┬────────┬────────┐
│ row_nr    ┆ day    ┆ value  │
│ ---       ┆ ---    ┆ ---    │
│ u32       ┆ i64    ┆ i64    │
╞═══════════╪════════╪════════╡
│ 0         ┆ 473189 ┆ 747152 │
│ 1         ┆ 511822 ┆ 298575 │
│ 2         ┆ 755167 ┆ 868027 │
│ 3         ┆ 950463 ┆ 289295 │
│ ...       ┆ ...    ┆ ...    │
│ 999999996 ┆ 828237 ┆ 503917 │
│ 999999997 ┆ 909996 ┆ 447681 │
│ 999999998 ┆ 309104 ┆ 588174 │
│ 999999999 ┆ 485525 ┆ 198567 │
└───────────┴────────┴────────┘

Assumption: Not sorted

Let's suppose that we cannot assume that the data is sorted by day. (We'll have to adapt the solutions somewhat.)

Join

Here's the results using a join. If you watch your CPU usage, for example using top in Linux, you'll see that the algorithm is heavily multi-threaded. It spends the majority of its time spread across all cores of your system.

start = time.perf_counter()
(
    df
    .join(
        df
        .select(pl.col('day').unique().sort())
        .with_columns(
            pl.col('day').shift(-1).alias('new_day')
        ),
        how='inner',
        on='day',
    )
)
print(time.perf_counter() - start)
shape: (1000000000, 4)
┌───────────┬────────┬────────┬─────────┐
│ row_nr    ┆ day    ┆ value  ┆ new_day │
│ ---       ┆ ---    ┆ ---    ┆ ---     │
│ u32       ┆ i64    ┆ i64    ┆ i64     │
╞═══════════╪════════╪════════╪═════════╡
│ 0         ┆ 473189 ┆ 747152 ┆ 473190  │
│ 1         ┆ 511822 ┆ 298575 ┆ 511823  │
│ 2         ┆ 755167 ┆ 868027 ┆ 755168  │
│ 3         ┆ 950463 ┆ 289295 ┆ 950464  │
│ ...       ┆ ...    ┆ ...    ┆ ...     │
│ 999999996 ┆ 828237 ┆ 503917 ┆ 828238  │
│ 999999997 ┆ 909996 ┆ 447681 ┆ 909997  │
│ 999999998 ┆ 309104 ┆ 588174 ┆ 309105  │
│ 999999999 ┆ 485525 ┆ 198567 ┆ 485526  │
└───────────┴────────┴────────┴─────────┘
>>> print(time.perf_counter() - start)
20.85321443199973

groupby-explode

Now let's try the groupby-explode solution. This algorithm will spend a good share of time in single-threaded mode.

I've had to add a sort after the grouping step because the algorithm assumes sorted data in the steps after it.

start = time.perf_counter()
(
    df
    .groupby("day", maintain_order=False)
    .agg_list()
    .sort(['day'])
    .with_columns(pl.col("day").shift(-1))
    .explode(pl.exclude("day"))
)
print(time.perf_counter() - start)
shape: (1000000000, 3)
┌──────┬───────────┬────────┐
│ day  ┆ row_nr    ┆ value  │
│ ---  ┆ ---       ┆ ---    │
│ i64  ┆ u32       ┆ i64    │
╞══════╪═══════════╪════════╡
│ 2    ┆ 197731    ┆ 4093   │
│ 2    ┆ 3154732   ┆ 433246 │
│ 2    ┆ 4825468   ┆ 436316 │
│ 2    ┆ 4927362   ┆ 83493  │
│ ...  ┆ ...       ┆ ...    │
│ null ┆ 993596728 ┆ 25604  │
│ null ┆ 995160321 ┆ 575415 │
│ null ┆ 996690852 ┆ 490825 │
│ null ┆ 999391650 ┆ 92113  │
└──────┴───────────┴────────┘
>>> print(time.perf_counter() - start)
54.04602192300081

rank

Now, the rank method. This algorithm will spend nearly all its time in single-threaded mode.

I've also had to add a sort here, as the ranks are assumed to be sorted in the search_sorted step.

start = time.perf_counter()
(
    df
    .sort(['day'])
    .with_columns(
        pl.col("day").rank("dense").cast(pl.Int64).alias("rank")
    )
    .with_columns(
        pl.col("rank").search_sorted(pl.col("rank") + 1).alias("idx")
    )
    .with_columns(
        pl.when(pl.col("idx") != pl.col("idx").max())
        .then(pl.col("idx"))
        .alias("idx")
    )
    .with_columns(
        pl.col("day").take(pl.col("idx")).alias("new")
    )
)
print(time.perf_counter() - start)
shape: (1000000000, 6)
┌───────────┬────────┬────────┬────────┬──────┬──────┐
│ row_nr    ┆ day    ┆ value  ┆ rank   ┆ idx  ┆ new  │
│ ---       ┆ ---    ┆ ---    ┆ ---    ┆ ---  ┆ ---  │
│ u32       ┆ i64    ┆ i64    ┆ i64    ┆ u32  ┆ i64  │
╞═══════════╪════════╪════════╪════════╪══════╪══════╡
│ 197731    ┆ 1      ┆ 4093   ┆ 1      ┆ 1907 ┆ 2    │
│ 3154732   ┆ 1      ┆ 433246 ┆ 1      ┆ 1907 ┆ 2    │
│ 4825468   ┆ 1      ┆ 436316 ┆ 1      ┆ 1907 ┆ 2    │
│ 4927362   ┆ 1      ┆ 83493  ┆ 1      ┆ 1907 ┆ 2    │
│ ...       ┆ ...    ┆ ...    ┆ ...    ┆ ...  ┆ ...  │
│ 993596728 ┆ 999999 ┆ 25604  ┆ 999999 ┆ null ┆ null │
│ 995160321 ┆ 999999 ┆ 575415 ┆ 999999 ┆ null ┆ null │
│ 996690852 ┆ 999999 ┆ 490825 ┆ 999999 ┆ null ┆ null │
│ 999391650 ┆ 999999 ┆ 92113  ┆ 999999 ┆ null ┆ null │
└───────────┴────────┴────────┴────────┴──────┴──────┘
>>> print(time.perf_counter() - start)
98.63108555600047

Assumption: Sorted by day

If we can assume that our data is already sorted by day, we can cut out unnecessary steps in our algorithms - as well as see some decent increases in speed.

We'll sort the data first and re-run our algorithms. Note that sorting sets the sorted flag on the day column, which allows algorithms to take shortcuts to increase speed. (If not sorting manually, then the set_sorted method can be used tell Polars that the column is pre-sorted.)

df = df.sort(['day'])
df
shape: (1000000000, 3)
┌───────────┬────────┬────────┐
│ row_nr    ┆ day    ┆ value  │
│ ---       ┆ ---    ┆ ---    │
│ u32       ┆ i64    ┆ i64    │
╞═══════════╪════════╪════════╡
│ 197731    ┆ 1      ┆ 4093   │
│ 3154732   ┆ 1      ┆ 433246 │
│ 4825468   ┆ 1      ┆ 436316 │
│ 4927362   ┆ 1      ┆ 83493  │
│ ...       ┆ ...    ┆ ...    │
│ 993596728 ┆ 999999 ┆ 25604  │
│ 995160321 ┆ 999999 ┆ 575415 │
│ 996690852 ┆ 999999 ┆ 490825 │
│ 999391650 ┆ 999999 ┆ 92113  │
└───────────┴────────┴────────┘

Join

The code employing a join needs no changes; however, it does see an incredible speedup.

start = time.perf_counter()
(
    df
    .join(
        df
        .select(pl.col('day').unique().sort())
        .with_columns(
            pl.col('day').shift(-1).alias('new_day')
        ),
        how='inner',
        on='day',
    )
)
print(time.perf_counter() - start)
shape: (1000000000, 4)
┌───────────┬────────┬────────┬─────────┐
│ row_nr    ┆ day    ┆ value  ┆ new_day │
│ ---       ┆ ---    ┆ ---    ┆ ---     │
│ u32       ┆ i64    ┆ i64    ┆ i64     │
╞═══════════╪════════╪════════╪═════════╡
│ 197731    ┆ 1      ┆ 4093   ┆ 2       │
│ 3154732   ┆ 1      ┆ 433246 ┆ 2       │
│ 4825468   ┆ 1      ┆ 436316 ┆ 2       │
│ 4927362   ┆ 1      ┆ 83493  ┆ 2       │
│ ...       ┆ ...    ┆ ...    ┆ ...     │
│ 993596728 ┆ 999999 ┆ 25604  ┆ null    │
│ 995160321 ┆ 999999 ┆ 575415 ┆ null    │
│ 996690852 ┆ 999999 ┆ 490825 ┆ null    │
│ 999391650 ┆ 999999 ┆ 92113  ┆ null    │
└───────────┴────────┴────────┴─────────┘
>>> print(time.perf_counter() - start)
8.71159654099938

Note the same exact join algorithm now finishes in only 8.7 seconds rather than 20.9 seconds, largely due to the data being pre-sorted, and the sorted flag being set on day.

groupby-explode

We'll eliminate the superfluous sort within the algorithm, and re-run it.

start = time.perf_counter()
(
    df
    .groupby("day", maintain_order=True)
    .agg_list()
    .with_columns(pl.col("day").shift(-1))
    .explode(pl.exclude("day"))
)
print(time.perf_counter() - start)
shape: (1000000000, 3)
┌──────┬───────────┬────────┐
│ day  ┆ row_nr    ┆ value  │
│ ---  ┆ ---       ┆ ---    │
│ i64  ┆ u32       ┆ i64    │
╞══════╪═══════════╪════════╡
│ 2    ┆ 197731    ┆ 4093   │
│ 2    ┆ 3154732   ┆ 433246 │
│ 2    ┆ 4825468   ┆ 436316 │
│ 2    ┆ 4927362   ┆ 83493  │
│ ...  ┆ ...       ┆ ...    │
│ null ┆ 993596728 ┆ 25604  │
│ null ┆ 995160321 ┆ 575415 │
│ null ┆ 996690852 ┆ 490825 │
│ null ┆ 999391650 ┆ 92113  │
└──────┴───────────┴────────┘
>>> print(time.perf_counter() - start)
8.249637401000655

Note how this algorithm takes slightly less time than the join algorithm, all due to the assumption of day being pre-sorted.

rank

Again, we'll now eliminated the superfluous sort and re-run the algorithm.

start = time.perf_counter()
(
    df
    .with_columns(
        pl.col("day").rank("dense").cast(pl.Int64).alias("rank")
    )
    .with_columns(
        pl.col("rank").search_sorted(pl.col("rank") + 1).alias("idx")
    )
    .with_columns(
        pl.when(pl.col("idx") != pl.col("idx").max())
        .then(pl.col("idx"))
        .alias("idx")
    )
    .with_columns(
        pl.col("day").take(pl.col("idx")).alias("new")
    )
)
print(time.perf_counter() - start)
shape: (1000000000, 6)
┌───────────┬────────┬────────┬────────┬──────┬──────┐
│ row_nr    ┆ day    ┆ value  ┆ rank   ┆ idx  ┆ new  │
│ ---       ┆ ---    ┆ ---    ┆ ---    ┆ ---  ┆ ---  │
│ u32       ┆ i64    ┆ i64    ┆ i64    ┆ u32  ┆ i64  │
╞═══════════╪════════╪════════╪════════╪══════╪══════╡
│ 197731    ┆ 1      ┆ 4093   ┆ 1      ┆ 1907 ┆ 2    │
│ 3154732   ┆ 1      ┆ 433246 ┆ 1      ┆ 1907 ┆ 2    │
│ 4825468   ┆ 1      ┆ 436316 ┆ 1      ┆ 1907 ┆ 2    │
│ 4927362   ┆ 1      ┆ 83493  ┆ 1      ┆ 1907 ┆ 2    │
│ ...       ┆ ...    ┆ ...    ┆ ...    ┆ ...  ┆ ...  │
│ 993596728 ┆ 999999 ┆ 25604  ┆ 999999 ┆ null ┆ null │
│ 995160321 ┆ 999999 ┆ 575415 ┆ 999999 ┆ null ┆ null │
│ 996690852 ┆ 999999 ┆ 490825 ┆ 999999 ┆ null ┆ null │
│ 999391650 ┆ 999999 ┆ 92113  ┆ 999999 ┆ null ┆ null │
└───────────┴────────┴────────┴────────┴──────┴──────┘
>>> print(time.perf_counter() - start)
48.90440067800046

Although this algorithm now takes roughly half the time, it's not quite as fast as the join or groupby-explode algorithms.

Of course, wall-clock performance is not the end-all-be-all. But when problems scale up, joins are particularly good tools, even when we cannot make assumptions regarding the sorted-ness of our data.

  • 1
    Appreciate the highly detailed analysis. It seems I have to try to start thinking in terms of `.join()` whereas I seem to default to `.groupby()` / `.explode()` - Thank you for all your answers, I've learned a lot from them. – jqurious Jan 28 '23 at 23:02
  • Thanks for your detailed analysis. It would be better to show how long does sort take? Another solution for the 'no sorted assumption' could be first sort and then do either join or groupby. How does that look like? – lebesgue Jan 29 '23 at 17:57
  • In another word, if df = df.sort(['day']) takes less than ~12s, then sort + join/groupby is as good as join for the not sorted case. If it takes much longer than 12s, then join is the clear winner in terms of speed. – lebesgue Jan 29 '23 at 18:00