2

I have an iterator that is consumed by two functions (mean_summarizer and std_summarizer in example below). I want both functions to process the iterator, WITHOUT ever having to load the entire iterator into memory at once.

Below is a minimal example (also in Colab) that provides the correct result, EXCEPT that it involves loading the entire input into memory at once. No need to understand the fancy code inside mean_summarizer, std_summarizer, and last - it's mainly like that for brevity.

Question is: What is the cleanest way to re-implement summarize_input_stream without changing the function signature (just the inside), such that its memory usage does not scale with length of the input stream?

I have a feeling coroutines are involved, but I don't know how to use them.

import numpy as np
from typing import Iterable, Mapping, Callable, Any

def summarize_input_stream(  # Run the input stream through multiple summarizers and collect results
        input_stream: Iterable[float],
        summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
    inputs = list(input_stream)  # PROBLEM IS HERE <-- We load entire stream into memory at once
    return {name: summarizer(inputs) for name, summarizer in summarizers.items()}

def last(iterable: Iterable[Any]) -> Any:  # Just returns last element of iterable
  return max(enumerate(iterable))[1]

def mean_summarizer(stream: Iterable[float]) -> float:  # Just computes mean online and returns final value
  return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)])

def std_summarizer(stream: Iterable[float]) -> float:   # Just computes standard deviation online and returns final value
  return last(cumsum_of_sq/(i+1) - (cumsum/(i+1))**2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)])**.5

summary_stats = summarize_input_stream(
    input_stream=(np.random.randn()*2+3 for _ in range(1000)),
    summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}
Peter
  • 12,274
  • 9
  • 71
  • 86
  • 1
    Would [itertools.tee](https://docs.python.org/3/library/itertools.html#itertools.tee) work? – sj95126 Sep 29 '22 at 19:19
  • 3
    @sj95126: As written, the first summarizer would completely consume its input before the second summarizer began executing. In situations like that, `itertools.tee` is no better than `list`ifying (and arguably worse; iterating the `tee`-ed iterators is slower than iterating a simple `list`); if one `tee`-ed iterator is unconsumed and another is completely consumed, `tee` stores the *entire* input internally, so your peak memory usage is identical to what `list`ifying requires. – ShadowRanger Sep 29 '22 at 19:22
  • 1
    Unrelated side-note: `last` is implemented *wildly* inefficiently. If the goal is just to get the last value of an arbitrary iterable (which may not be a sequence, so indexing to `-1` is not an option), you can simplify to just `return collections.deque(iterable, maxlen=1)[0]` and avoid the per-element work of making an `int` and a two-`tuple`, having `max` compare the `int`s (which will always find the new `tuple` to be greater), and repeating `n` times. With `maxlen=1`, `deque` will keep pulling new elements, replacing the old one cheaply, leaving you with just the final element at index 0. – ShadowRanger Sep 29 '22 at 19:28
  • @ShadowRanger Hah yes, guilty. The max thing I took from here: https://stackoverflow.com/a/6251297/851699 because it was the only one-liner, and `itertools.last` is not in python by default. It's not that inefficient though, it should still be O(1) memory and O(N) time. – Peter Sep 29 '22 at 19:39
  • @ShadowRanger actually, I think `max(enumerate(iterable))` is kind of a clever way to get the last item, i wouldn't call it *wildly* inefficient. I mean, I would still just do `for item in iterable: pass; return item` or something like that first because it is more understandable – juanpa.arrivillaga Sep 29 '22 at 19:39
  • @juanpa.arrivillaga: Yeah, the `for` loop approach is a perfectly valid one. `deque` optimizes slightly by doing roughly the same thing at the C layer, but neither one is doing truly unnecessary work. "*wildly*" was hyperbole I'll admit, but it is doing quite a bit of pointless work per-item (pointless setup/teardown work doesn't matter as much since it doesn't scale with input size) to force built-ins to achieve the desired result in a roundabout way. The `deque` approach is admittedly more obscure, inlining it is questionable, but wrapped in a clearly named function (`last`) it's fine by me. – ShadowRanger Sep 29 '22 at 19:42
  • @ShadowRanger yeah, I find the `max` one is a little cryptic so i wouldn't use it, although the `deque` approach is cryptic as well, but it is an idiom. – juanpa.arrivillaga Sep 29 '22 at 19:43
  • 1
    I've had precisely this use case and question myself before, did it with threads and queues like you did now, but wasn't super happy with it. I think partially for efficiency reasons and partially because a similar use case had many more summarizers and that somehow even got stuck at 76 summarizers (75 seemed still fine). I also felt like coroutines might be good for this and so I started learning about that stuff. Still hoping we'll get such an answer. – Kelly Bundy Sep 29 '22 at 23:12
  • Yeah I guess you've seen my other answer https://stackoverflow.com/a/73901834/851699 - it works without multithreading, at the cost of making the summarizers a little more complicated. I'm with you in that it feels like it *should* be doable without multithreading - maybe just not in current python (or maybe I just need to learn about these magical coroutines) – Peter Sep 29 '22 at 23:23

3 Answers3

2

I found a solution that does not involve changing the signature of summarize_input_stream. It launches one thread per summarizer and feeds each one incrementally via a separate blocking queue (link to Colab).

import numpy as np
from typing import Iterable, Mapping, Callable, Any
from threading import Thread
from queue import Queue
from functools import partial


def summarize_input_stream(  # Run the input stream through multiple summarizers and collect results
        input_stream: Iterable[float],
        summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
    POISON_PILL = object()
    def run_summarizer(summarizer: Callable[[Iterable[float]], float], queue: Queue) -> float:
        result = summarizer(iter(queue.get, POISON_PILL)) # Waits until the food is ready to eat
        queue.put(result)  # Use the queue the other way around to return the result
    queues = [Queue(maxsize=1) for _ in summarizers]  # <-- Note We could can probably be more time-efficient if we increase maxsize, which should cause less thread switching at the cost of more memory usage
    threads = [Thread(target=partial(run_summarizer, summarizer, queue)) for summarizer, queue in zip(summarizers.values(), queues)]
    for t in threads:
        t.start()
    for inp in input_stream:
        for queue in queues:
            queue.put(inp)  # Waits until the summarizer is hungry to feed it
    for queue in queues:
        queue.put(POISON_PILL)  # Stop the iteration
    for t in threads:
        t.join()
    results = [queue.get() for queue in queues]
    return {name: result for name, result in zip(summarizers, results)}

def last(iterable: Iterable[Any]) -> Any:  # Just returns last element of iterable
    return max(enumerate(iterable))[1]

def mean_summarizer(stream: Iterable[float]) -> float:  # Just computes mean online and returns final value
    return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg * i / (i + 1) + x / (i + 1)])

def std_summarizer(stream: Iterable[float]) -> float:  # Just computes standard deviation online and returns final value
    return last(cumsum_of_sq / (i + 1) - (cumsum / (i + 1)) ** 2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in
                [(cumsum_of_sq + x ** 2, cumsum + x)]) ** .5

summary_stats = summarize_input_stream(
    input_stream=(np.random.randn() * 2 + 3 for _ in range(1000)),
    summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}
Peter
  • 12,274
  • 9
  • 71
  • 86
  • 1
    Nice, now I don't need to finish mine which was going to be almost exactly the same :-). I thought about using the queues for the results as well, though opted to instead have my `run_summarizer` directly write its result into the result dict (created empty in the main thread). With your code, I'd still use `return dict(zip(summarizers, results))`. – Kelly Bundy Sep 29 '22 at 22:54
  • 1
    (I guess having the main thread build the dict might be safer than multiple threads trying to concurrently set values...) – Kelly Bundy Sep 29 '22 at 23:03
1

You can't do this. A generalized iterator can only be processed once, and to make it possible to process it twice, you need to store it in some way, either by listifying it as you're doing, or using itertools.tee (which, if one of the tee-d iterators is consumed completely before the other pulls any items, is morally equivalent; it has to store all of the data internally).

The only way to make this work is if you use a single summarizer that processes the input once and computes all relevant summaries at the same time.

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
  • Downvoter: If this answer is wrong, please tell me. There are special cases for resettable iterators (`.seek(0)` on a file being the most common), but in general, the design the OP wants is impossible, and I suggested an alternative that *can* work. – ShadowRanger Sep 29 '22 at 19:32
  • Thanks for the answer. When you say "you can't do this" - I presume you mean in single-threaded, normal python? As it seems like it should be possible *in principle*, at least using some sort of multithreaded scheme. – Peter Sep 29 '22 at 19:53
  • @Peter: If `itertools.tee` were threadsafe (it is not, and is explicitly documented as such), you could `tee` and launch two threads to consume each `tee`-ed iterator and cross your fingers that one of them doesn't get *so* far ahead of the other that your presumably too big to fit in memory data ends up realizing too many items at once and fails to fit in memory. Since this isn't possible anyway, you could do something fairly complex like launch two threads; the main thread iterates and feeds the same item to each of two bounded `queue.Queue`s (one for each thread), and then wrap them... – ShadowRanger Sep 29 '22 at 19:58
  • ...in each thread with a generator that performs repeated `get`s from the queue so it behaves like an iterator in each thread (just one that takes a long time to return an item in some cases), but at that point you're courting madness. – ShadowRanger Sep 29 '22 at 20:00
  • Hmm alright, thanks for the discussion - I've found a solution to my problem - it just requires a change to the signature - see other answer. – Peter Sep 29 '22 at 21:44
  • Hmm, why does your answer still say *"You can't do this"* after you described how you *can*? Threads with Queues is exactly how I've done this a while ago. – Kelly Bundy Sep 29 '22 at 22:17
  • @KellyBundy: Virtually every "You can't do this" is implicitly qualified with "in any reasonable way that doesn't twist the language in knots or involve complications significantly exceeding the scope of the original problem". Involving threads *solely* to enable splitting an iterator like this without holding the whole thing in memory is introducing significant complications (threading properly is not something I'd trust the majority of developers to get right, and using threading can cause problems using other language features) for a relatively modest benefit. – ShadowRanger Sep 29 '22 at 22:25
  • Ok, we have a threading-based solution now that doesn't court too much madness, see other answer. No finger-crossing required since we use blocking queues. I think it can actually make sense to use this in cases like mine - which (in the non-toy problem version) involves processing frames of video sequentially. – Peter Sep 29 '22 at 22:40
  • @Peter: Nice. You even used the design I had in mind while writing this (a sentinel made with `object()` to allow you to turn `Queue.get` into an iterator trivially with two-arg `iter`). I figure if you know how to do that, you can probably be trusted to maintain code like that. :-) Up-voted. – ShadowRanger Sep 29 '22 at 22:47
  • @ShadowRanger I think I used two-arg `iter` but then switched away from it because it's not safe. An input element could claim to equal the sentinel. I've considered asking for it to support identity checking, but don't have much hope. – Kelly Bundy Sep 29 '22 at 23:22
  • If the sentinel is created inside the function with `object()` I think it should be guaranteed unique, no? – Peter Sep 29 '22 at 23:27
  • Ah no I see what you're saying, the input object could be overriding `__eq__` – Peter Sep 29 '22 at 23:28
  • @Peter: The comparison by `iter` is done with equality, not identity, so a poorly designed class could implement `__eq__` with just `return True` and claim to be equal to any/everything. It's a theoretical risk, but as long your classes aren't pathological it should be fine. – ShadowRanger Sep 29 '22 at 23:29
  • @Peter Yes. [Demo](https://tio.run/##JcuxDkAwEIDhvU9x4xGbzegZ7BfqUJGW6xk8/dH4xj/5z0e3FFszf4w5Q59k7Rx8Zl6AiC8irKn6WyGst0QY5GbnTglR8QhZMSgLlr2BNO3sFauP2Qs). – Kelly Bundy Sep 29 '22 at 23:31
  • @Peter Hmm, actually the equality check "happens" to be in the direction that allows to defeat that. [Demo](https://tio.run/##jY49D8IgEIZ3fsUlLmC6uZl0cTBx1p0Qe61EclSOJu2vRyh@jb7jc@/HjUu8edqldHWGGQ4@DHsBWR32oDU@tJZbrSorChinQHAJEwpRQ2ekaAndX8GjcZyTm3UK2CwMC3LzKamIvBiDpSiLSypo249BKvW6fUm5V6cq1SfqLdmI4LwfGzAM/G4fMHIGd@xA9D7ADJYgW8O61Pyu1Nfr1KxSegI). But that's an undocumented implementation detail. – Kelly Bundy Sep 29 '22 at 23:41
0

As @ShadowRanger says, it seems (unless proven otherwise) that there is no solution to the question as stated that does not involve multi-threading.

However, with a minor change to the signature of summarize_input_stream (which admittedly violates the rules which I myself wrote), we can get the result without paying the (memory) price.

The trick (which I'll call "genfunctrification" unless it has a pre-existing name) is:

  • We turn our summarizers from functions of type: Callable[[Iterable[float]], float] ...
  • .. into generator-functions of type Callable[[Iterable[float]], Iterable[Callable[[], float]]]
  • Note that we could have just made them return the results directly (Callable[[Iterable[float]], Iterable[float]), but this would involve wastefully recomputing things like (cumsum_of_sq/(i+1) - (cumsum/(i+1))**2)**.5 on each iteration, when we only actually need it on the last, so instead we make our iterators yield Callables, which can compute the result only when needed (after the last iteration).

The modified code (and Colab link).

import numpy as np
from typing import Iterable, Mapping, Callable, Any, Sequence
import itertools

def summarize_input_stream(  # Run the input stream through multiple summarizers and collect results
        input_stream: Iterable[float],
        summarizers: Mapping[str, Callable[[Iterable[float]], Iterable[Callable[[], float]]]]
) -> Mapping[str, float]:
    input_streams_teed = itertools.tee(input_stream, len(summarizers))
    result_getter_streams: Sequence[Iterable[Callable[[], float]]] = [summarizer(stream_copy) for summarizer, stream_copy in zip(summarizers.values(), input_streams_teed)]
    final_results = last([f() for f in func_tup] for func_tup in zip(*result_getter_streams))
    return {name: r for name, r in zip(summarizers, final_results)}

def last(iterable: Iterable[Any]) -> Any:  # Just returns last element of iterable
  return max(enumerate(iterable))[1]

def mean_summarizer(stream: Iterable[float]) -> Iterable[Callable[[], float]]:  # Just computes mean online and returns final value
  return ((lambda: avg) for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)])

def std_summarizer(stream: Iterable[float]) ->  Iterable[Callable[[], float]]:   # Just computes standard deviation online and returns final value
  return ((lambda: (cumsum_of_sq/(i+1) - (cumsum/(i+1))**2)**.5) for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)])

summary_stats = summarize_input_stream(
    input_stream=(np.random.randn()*2+3 for _ in range(1000)),
    summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}
  • Note: since this particular implementation depends on dicts maintaining their order, it will only work reliably in Python 3.7+ - though it could easily be backported if needed.
Peter
  • 12,274
  • 9
  • 71
  • 86
  • Your bottom note doesn't make sense to me. If I'm not mistaken, you only depend on iteration order of the same unmodified dict being the same in multiple iterations. That was guaranteed before 3.7 already. – Kelly Bundy Sep 30 '22 at 00:06