I have an iterator that is consumed by two functions (mean_summarizer
and std_summarizer
in example below). I want both functions to process the iterator, WITHOUT ever having to load the entire iterator into memory at once.
Below is a minimal example (also in Colab) that provides the correct result, EXCEPT that it involves loading the entire input into memory at once. No need to understand the fancy code inside mean_summarizer
, std_summarizer
, and last
- it's mainly like that for brevity.
Question is: What is the cleanest way to re-implement summarize_input_stream
without changing the function signature (just the inside), such that its memory usage does not scale with length of the input stream?
I have a feeling coroutines are involved, but I don't know how to use them.
import numpy as np
from typing import Iterable, Mapping, Callable, Any
def summarize_input_stream( # Run the input stream through multiple summarizers and collect results
input_stream: Iterable[float],
summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
inputs = list(input_stream) # PROBLEM IS HERE <-- We load entire stream into memory at once
return {name: summarizer(inputs) for name, summarizer in summarizers.items()}
def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable
return max(enumerate(iterable))[1]
def mean_summarizer(stream: Iterable[float]) -> float: # Just computes mean online and returns final value
return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)])
def std_summarizer(stream: Iterable[float]) -> float: # Just computes standard deviation online and returns final value
return last(cumsum_of_sq/(i+1) - (cumsum/(i+1))**2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)])**.5
summary_stats = summarize_input_stream(
input_stream=(np.random.randn()*2+3 for _ in range(1000)),
summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}