@jqurious, what I'd recommend for remapping values is a join
. Joins are heavily optimized and scale very well, especially on machines with a good number of cores.
As an example, let's benchmark some solutions.
First, some data
Let's use enough data to avoid spurious results from "microbenchmarking" using tiny datasets. (I see this all too often - tiny datasets with benchmark results down to a few microseconds or milliseconds.)
On my 32-core system with 512 GB of RAM, that means expanding the dataset to one billion records. (Choose a different value below as appropriate for your computing platform.)
import polars as pl
import numpy as np
import time
rng = np.random.default_rng(1)
nbr_rows = 1_000_000_000
df = pl.DataFrame(
dict(
day=rng.integers(1, 1_000_000, nbr_rows),
value=rng.integers(1, 1_000_000, nbr_rows),
)
).with_row_count()
df
shape: (1000000000, 3)
┌───────────┬────────┬────────┐
│ row_nr ┆ day ┆ value │
│ --- ┆ --- ┆ --- │
│ u32 ┆ i64 ┆ i64 │
╞═══════════╪════════╪════════╡
│ 0 ┆ 473189 ┆ 747152 │
│ 1 ┆ 511822 ┆ 298575 │
│ 2 ┆ 755167 ┆ 868027 │
│ 3 ┆ 950463 ┆ 289295 │
│ ... ┆ ... ┆ ... │
│ 999999996 ┆ 828237 ┆ 503917 │
│ 999999997 ┆ 909996 ┆ 447681 │
│ 999999998 ┆ 309104 ┆ 588174 │
│ 999999999 ┆ 485525 ┆ 198567 │
└───────────┴────────┴────────┘
Assumption: Not sorted
Let's suppose that we cannot assume that the data is sorted by day
. (We'll have to adapt the solutions somewhat.)
Join
Here's the results using a join. If you watch your CPU usage, for example using top
in Linux, you'll see that the algorithm is heavily multi-threaded. It spends the majority of its time spread across all cores of your system.
start = time.perf_counter()
(
df
.join(
df
.select(pl.col('day').unique().sort())
.with_columns(
pl.col('day').shift(-1).alias('new_day')
),
how='inner',
on='day',
)
)
print(time.perf_counter() - start)
shape: (1000000000, 4)
┌───────────┬────────┬────────┬─────────┐
│ row_nr ┆ day ┆ value ┆ new_day │
│ --- ┆ --- ┆ --- ┆ --- │
│ u32 ┆ i64 ┆ i64 ┆ i64 │
╞═══════════╪════════╪════════╪═════════╡
│ 0 ┆ 473189 ┆ 747152 ┆ 473190 │
│ 1 ┆ 511822 ┆ 298575 ┆ 511823 │
│ 2 ┆ 755167 ┆ 868027 ┆ 755168 │
│ 3 ┆ 950463 ┆ 289295 ┆ 950464 │
│ ... ┆ ... ┆ ... ┆ ... │
│ 999999996 ┆ 828237 ┆ 503917 ┆ 828238 │
│ 999999997 ┆ 909996 ┆ 447681 ┆ 909997 │
│ 999999998 ┆ 309104 ┆ 588174 ┆ 309105 │
│ 999999999 ┆ 485525 ┆ 198567 ┆ 485526 │
└───────────┴────────┴────────┴─────────┘
>>> print(time.perf_counter() - start)
20.85321443199973
groupby-explode
Now let's try the groupby-explode solution. This algorithm will spend a good share of time in single-threaded mode.
I've had to add a sort
after the grouping step because the algorithm assumes sorted data in the steps after it.
start = time.perf_counter()
(
df
.groupby("day", maintain_order=False)
.agg_list()
.sort(['day'])
.with_columns(pl.col("day").shift(-1))
.explode(pl.exclude("day"))
)
print(time.perf_counter() - start)
shape: (1000000000, 3)
┌──────┬───────────┬────────┐
│ day ┆ row_nr ┆ value │
│ --- ┆ --- ┆ --- │
│ i64 ┆ u32 ┆ i64 │
╞══════╪═══════════╪════════╡
│ 2 ┆ 197731 ┆ 4093 │
│ 2 ┆ 3154732 ┆ 433246 │
│ 2 ┆ 4825468 ┆ 436316 │
│ 2 ┆ 4927362 ┆ 83493 │
│ ... ┆ ... ┆ ... │
│ null ┆ 993596728 ┆ 25604 │
│ null ┆ 995160321 ┆ 575415 │
│ null ┆ 996690852 ┆ 490825 │
│ null ┆ 999391650 ┆ 92113 │
└──────┴───────────┴────────┘
>>> print(time.perf_counter() - start)
54.04602192300081
rank
Now, the rank method. This algorithm will spend nearly all its time in single-threaded mode.
I've also had to add a sort
here, as the ranks are assumed to be sorted in the search_sorted
step.
start = time.perf_counter()
(
df
.sort(['day'])
.with_columns(
pl.col("day").rank("dense").cast(pl.Int64).alias("rank")
)
.with_columns(
pl.col("rank").search_sorted(pl.col("rank") + 1).alias("idx")
)
.with_columns(
pl.when(pl.col("idx") != pl.col("idx").max())
.then(pl.col("idx"))
.alias("idx")
)
.with_columns(
pl.col("day").take(pl.col("idx")).alias("new")
)
)
print(time.perf_counter() - start)
shape: (1000000000, 6)
┌───────────┬────────┬────────┬────────┬──────┬──────┐
│ row_nr ┆ day ┆ value ┆ rank ┆ idx ┆ new │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ u32 ┆ i64 ┆ i64 ┆ i64 ┆ u32 ┆ i64 │
╞═══════════╪════════╪════════╪════════╪══════╪══════╡
│ 197731 ┆ 1 ┆ 4093 ┆ 1 ┆ 1907 ┆ 2 │
│ 3154732 ┆ 1 ┆ 433246 ┆ 1 ┆ 1907 ┆ 2 │
│ 4825468 ┆ 1 ┆ 436316 ┆ 1 ┆ 1907 ┆ 2 │
│ 4927362 ┆ 1 ┆ 83493 ┆ 1 ┆ 1907 ┆ 2 │
│ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... │
│ 993596728 ┆ 999999 ┆ 25604 ┆ 999999 ┆ null ┆ null │
│ 995160321 ┆ 999999 ┆ 575415 ┆ 999999 ┆ null ┆ null │
│ 996690852 ┆ 999999 ┆ 490825 ┆ 999999 ┆ null ┆ null │
│ 999391650 ┆ 999999 ┆ 92113 ┆ 999999 ┆ null ┆ null │
└───────────┴────────┴────────┴────────┴──────┴──────┘
>>> print(time.perf_counter() - start)
98.63108555600047
Assumption: Sorted by day
If we can assume that our data is already sorted by day
, we can cut out unnecessary steps in our algorithms - as well as see some decent increases in speed.
We'll sort the data first and re-run our algorithms. Note that sorting sets the sorted
flag on the day
column, which allows algorithms to take shortcuts to increase speed. (If not sorting manually, then the set_sorted
method can be used tell Polars that the column is pre-sorted.)
df = df.sort(['day'])
df
shape: (1000000000, 3)
┌───────────┬────────┬────────┐
│ row_nr ┆ day ┆ value │
│ --- ┆ --- ┆ --- │
│ u32 ┆ i64 ┆ i64 │
╞═══════════╪════════╪════════╡
│ 197731 ┆ 1 ┆ 4093 │
│ 3154732 ┆ 1 ┆ 433246 │
│ 4825468 ┆ 1 ┆ 436316 │
│ 4927362 ┆ 1 ┆ 83493 │
│ ... ┆ ... ┆ ... │
│ 993596728 ┆ 999999 ┆ 25604 │
│ 995160321 ┆ 999999 ┆ 575415 │
│ 996690852 ┆ 999999 ┆ 490825 │
│ 999391650 ┆ 999999 ┆ 92113 │
└───────────┴────────┴────────┘
Join
The code employing a join
needs no changes; however, it does see an incredible speedup.
start = time.perf_counter()
(
df
.join(
df
.select(pl.col('day').unique().sort())
.with_columns(
pl.col('day').shift(-1).alias('new_day')
),
how='inner',
on='day',
)
)
print(time.perf_counter() - start)
shape: (1000000000, 4)
┌───────────┬────────┬────────┬─────────┐
│ row_nr ┆ day ┆ value ┆ new_day │
│ --- ┆ --- ┆ --- ┆ --- │
│ u32 ┆ i64 ┆ i64 ┆ i64 │
╞═══════════╪════════╪════════╪═════════╡
│ 197731 ┆ 1 ┆ 4093 ┆ 2 │
│ 3154732 ┆ 1 ┆ 433246 ┆ 2 │
│ 4825468 ┆ 1 ┆ 436316 ┆ 2 │
│ 4927362 ┆ 1 ┆ 83493 ┆ 2 │
│ ... ┆ ... ┆ ... ┆ ... │
│ 993596728 ┆ 999999 ┆ 25604 ┆ null │
│ 995160321 ┆ 999999 ┆ 575415 ┆ null │
│ 996690852 ┆ 999999 ┆ 490825 ┆ null │
│ 999391650 ┆ 999999 ┆ 92113 ┆ null │
└───────────┴────────┴────────┴─────────┘
>>> print(time.perf_counter() - start)
8.71159654099938
Note the same exact join
algorithm now finishes in only 8.7 seconds rather than 20.9 seconds, largely due to the data being pre-sorted, and the sorted
flag being set on day
.
groupby-explode
We'll eliminate the superfluous sort
within the algorithm, and re-run it.
start = time.perf_counter()
(
df
.groupby("day", maintain_order=True)
.agg_list()
.with_columns(pl.col("day").shift(-1))
.explode(pl.exclude("day"))
)
print(time.perf_counter() - start)
shape: (1000000000, 3)
┌──────┬───────────┬────────┐
│ day ┆ row_nr ┆ value │
│ --- ┆ --- ┆ --- │
│ i64 ┆ u32 ┆ i64 │
╞══════╪═══════════╪════════╡
│ 2 ┆ 197731 ┆ 4093 │
│ 2 ┆ 3154732 ┆ 433246 │
│ 2 ┆ 4825468 ┆ 436316 │
│ 2 ┆ 4927362 ┆ 83493 │
│ ... ┆ ... ┆ ... │
│ null ┆ 993596728 ┆ 25604 │
│ null ┆ 995160321 ┆ 575415 │
│ null ┆ 996690852 ┆ 490825 │
│ null ┆ 999391650 ┆ 92113 │
└──────┴───────────┴────────┘
>>> print(time.perf_counter() - start)
8.249637401000655
Note how this algorithm takes slightly less time than the join
algorithm, all due to the assumption of day
being pre-sorted.
rank
Again, we'll now eliminated the superfluous sort
and re-run the algorithm.
start = time.perf_counter()
(
df
.with_columns(
pl.col("day").rank("dense").cast(pl.Int64).alias("rank")
)
.with_columns(
pl.col("rank").search_sorted(pl.col("rank") + 1).alias("idx")
)
.with_columns(
pl.when(pl.col("idx") != pl.col("idx").max())
.then(pl.col("idx"))
.alias("idx")
)
.with_columns(
pl.col("day").take(pl.col("idx")).alias("new")
)
)
print(time.perf_counter() - start)
shape: (1000000000, 6)
┌───────────┬────────┬────────┬────────┬──────┬──────┐
│ row_nr ┆ day ┆ value ┆ rank ┆ idx ┆ new │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ u32 ┆ i64 ┆ i64 ┆ i64 ┆ u32 ┆ i64 │
╞═══════════╪════════╪════════╪════════╪══════╪══════╡
│ 197731 ┆ 1 ┆ 4093 ┆ 1 ┆ 1907 ┆ 2 │
│ 3154732 ┆ 1 ┆ 433246 ┆ 1 ┆ 1907 ┆ 2 │
│ 4825468 ┆ 1 ┆ 436316 ┆ 1 ┆ 1907 ┆ 2 │
│ 4927362 ┆ 1 ┆ 83493 ┆ 1 ┆ 1907 ┆ 2 │
│ ... ┆ ... ┆ ... ┆ ... ┆ ... ┆ ... │
│ 993596728 ┆ 999999 ┆ 25604 ┆ 999999 ┆ null ┆ null │
│ 995160321 ┆ 999999 ┆ 575415 ┆ 999999 ┆ null ┆ null │
│ 996690852 ┆ 999999 ┆ 490825 ┆ 999999 ┆ null ┆ null │
│ 999391650 ┆ 999999 ┆ 92113 ┆ 999999 ┆ null ┆ null │
└───────────┴────────┴────────┴────────┴──────┴──────┘
>>> print(time.perf_counter() - start)
48.90440067800046
Although this algorithm now takes roughly half the time, it's not quite as fast as the join
or groupby-explode algorithms.
Of course, wall-clock performance is not the end-all-be-all. But when problems scale up, joins are particularly good tools, even when we cannot make assumptions regarding the sorted-ness of our data.