2

The Goal

This effort is towards creating an efficient solution to the following problem.

source = lambda: range(1 << 24)  # for example
functions = (min, max, sum)  # for example
data = tuple(source())  # from some generator
results = tuple(f(data) for f in functions)

This works. The source() function generates however many values it may. They're put into a tuple called data. Then a series of functions are called with that tuple to give the results. These functions iterate over one given parameterized iterator once and then give their result. This is fine for small datasets. However, if source() generates many, many values, they must all be stored. This can hog memory.

Possible solution

Something like...

from typing import Callable, Iterable, Tuple, TypeVar

TI = TypeVar('TI')
TO = TypeVar('TO')


def magic_function(data: Iterable[TI], fxns: Iterable[Callable[[Iterable[TI]], TO]]) -> Tuple[TO, ...]:
    stored = tuple(data)  # memory hog, prohibitively
    return tuple(f(stored) for f in fxns)


source = lambda: range(1 << 24)  # for example
functions = (min, max, sum)  # for example
results = magic_function(source(), functions)

This is what I've been trying to do. This magic_function() would give the data iterator to some kind of internal asynchronous server. The fxns would then be given asynchronous clients -- which would appear to be normal iterators. The fxns can process these clients as iterators unmodified. The fxns cannot be modified. It is possible to do this with the threading module. The overhead would be horrendous, though.

Extra clarity

This should be true.

source = lambda: range(1 << 24)  # for example
functions = (min, max, sum)  # for example
if first_method:
    data = tuple(source())  # from some generator
    results = tuple(f(data) for f in functions)
else:
    results = magic_function(source(), functions)

Whether first_method is True or False, for source()'s same output and the same functions, the results should always match (for single-pass iterator-consuming functions). The first computes and stores the entire dataset. This can be absently wasteful and slow. The magic method ought to save memory with minimal overhead costs (both time and memory).

Threading implementation

This is a working implementation using the threading module. It's visibly slow...

#!/usr/bin/python3
from collections import namedtuple
from random import randint
from statistics import geometric_mean, harmonic_mean, mean, median, median_high, median_low, mode
from threading import Event, Lock, Thread
from typing import *

''' https://pastebin.com/u4mTHfgc '''

int_iterable = Iterable[int]
_T = TypeVar('_T1', int, float)
_FXN_T = Callable[[int_iterable], _T]


class Server:
    _it: int_iterable
    slots: int
    edit_slots: Lock
    element: _T
    available: Event
    zero_slots: Event
    end: bool

    def __init__(self, it: int_iterable):
        self._it = it
        self.slots = 0
        self.edit_slots = Lock()
        self.available = Event()
        self.zero_slots = Event()
        self.end = False

    def server(self, queue_length: int):
        available = self.available
        zero_slots = self.zero_slots
        for v in self._it:
            self.slots = queue_length
            self.element = v
            zero_slots.clear()
            available.set()
            zero_slots.wait()
        self.slots = queue_length
        self.end = True
        zero_slots.clear()
        available.set()
        zero_slots.wait()

    def client(self) -> int_iterable:
        available = self.available
        zero_slots = self.zero_slots
        edit_slots = self.edit_slots
        while True:
            available.wait()
            end = self.end
            if not end:
                yield self.element
            with edit_slots:
                self.slots -= 1
                if self.slots == 0:
                    available.clear()
                    zero_slots.set()
            zero_slots.wait()
            if end:
                break


class Slot:
    thread: Thread
    fxn: _FXN_T
    server: Server
    qid: int
    result: Union[Optional[_T], Exception, Tuple[Exception, Exception]]

    def __init__(self, fxn: _FXN_T, server: Server, qid: int):
        self.thread = Thread(target = self.run, name = f'BG {id(self)} thread {qid}')
        self.fxn = fxn
        self.server = server
        self.qid = qid
        self.result = None

    def run(self):
        client = self.server.client()
        try:
            self.result = self.fxn(client)
        except Exception as e:
            self.result = e
            try:
                for _ in client:  # one thread breaking won't break it all.
                    pass
            except Exception as f:
                self.result = e, f


class BranchedGenerator:
    _server: Server
    _queue: List[Slot]

    def __init__(self, it: int_iterable):
        self._server = Server(it)
        self._queue = []

    def new(self, fxn: _FXN_T) -> int:
        qid = len(self._queue)
        self._queue.append(Slot(fxn, self._server, qid))
        return qid

    def finalize(self):
        queue = self._queue
        for t in queue:
            t.thread.start()
        self._server.server(len(queue))
        for t in queue:
            t.thread.join()

    def get(self, qid: int) -> _T:
        return self._queue[qid].result

    @classmethod
    def make(cls, it: int_iterable, fxns: Iterable[_FXN_T]) -> Tuple[_T, ...]:
        tmp = cls(it)
        qid_range = max(map(tmp.new, fxns))
        tmp.finalize()
        return tuple((tmp.get(qid)) for qid in range(qid_range + 1))


seq_stats = namedtuple('seq_stats', ('tuple', 'mean', 'harmonic_mean', 'geometric_mean', 'median', 'median_high', 'median_low', 'mode'))


def bundle_bg(xs: int_iterable) -> seq_stats:
    tmp = BranchedGenerator(xs)
    # noinspection PyTypeChecker
    ys = seq_stats(
        tmp.new(tuple),
        tmp.new(mean),
        tmp.new(harmonic_mean),
        tmp.new(geometric_mean),
        tmp.new(median),
        tmp.new(median_high),
        tmp.new(median_low),
        tmp.new(mode)
    )
    tmp.finalize()
    return seq_stats(
        tmp.get(ys.tuple),
        tmp.get(ys.mean),
        tmp.get(ys.harmonic_mean),
        tmp.get(ys.geometric_mean),
        tmp.get(ys.median),
        tmp.get(ys.median_high),
        tmp.get(ys.median_low),
        tmp.get(ys.mode)
    )


def bundle(xs: int_iterable) -> seq_stats:
    return seq_stats(
        tuple(xs),
        mean(xs),
        harmonic_mean(xs),
        geometric_mean(xs),
        median(xs),
        median_high(xs),
        median_low(xs),
        mode(xs)
    )


def display(v: seq_stats):
    print(f'Statistics of {v.tuple}:\n'
          f'\tMean: {v.mean}\n'
          f'\tHarmonic Mean: {v.harmonic_mean}\n'
          f'\tGeometric Mean: {v.geometric_mean}\n'
          f'\tMedian: {v.median}\n'
          f'\tMedian High: {v.median_high}\n'
          f'\tMedian Low: {v.median_low}\n'
          f'\tMode: {v.mode};')


def new(length: int, inclusive_maximum: int) -> int_iterable:
    return (randint(1, inclusive_maximum) for _ in range(length))


def test1() -> int:
    sample = new(10, 1 << 65)
    struct1 = bundle_bg(sample)
    display(struct1)
    struct2 = bundle(struct1.tuple)
    display(struct2)
    matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
    display(matches)
    return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))


def test2():
    sample = new(1000, 1 << 5)
    struct1 = seq_stats(*BranchedGenerator.make(
        sample,
        (tuple, mean, harmonic_mean, geometric_mean, median, median_high, median_low, mode)
    ))
    display(struct1)
    struct2 = bundle(struct1.tuple)
    display(struct2)
    matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
    display(matches)
    return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))


def test3():
    pass


if __name__ == '__main__':
    exit((test2()))

The Branching Generator Module (V3) [using threading] - Pastebin.com link has the updated code. From Start to output, a half second elapses. That's just for eight functions! Both test1() and test2() have this speed issue.

Attempts

I have tried to implement magic_function() using the asyncio module.

#!/usr/bin/python3
from asyncio import Task, create_task, run, wait
from collections import deque, namedtuple
from random import randint
from statistics import geometric_mean, harmonic_mean, mean, median, median_high, median_low, mode
from typing import *

''' https://pastebin.com/ELzEaSK8 '''

int_iterable = Iterable[int]
_T = TypeVar('_T1', int, float)
ENGINE_T = AsyncGenerator[Tuple[_T, bool], int]


async def injector(engine: ENGINE_T, qid: int) -> AsyncIterator[int]:
    while True:
        try:
            x, try_again = await engine.asend(qid)
        except StopAsyncIteration:
            break
        if try_again:
            continue
        yield x


WRAPPER_FXN_T = Callable[[int_iterable], _T]


def wrapper(fxn: WRAPPER_FXN_T, engine: ENGINE_T, qid: int):
    async def i():
        # TypeError: 'async_generator' object is not iterable
        return fxn(iter(x async for x in injector(engine, qid)))

    return i


class BranchedGenerator:
    _it: int_iterable
    _engine: ENGINE_T
    _queue: Union[tuple, deque]

    def __init__(self, it: int_iterable):
        self._it = it
        self._engine = self._make_engine()
        # noinspection PyTypeChecker
        wait(self._engine)
        self._queue = deque()

    async def _make_engine(self) -> ENGINE_T:  # it's like a server
        lq = len(self._queue)
        result = try_again = 0, True
        for value in self._it:
            waiting = set(range(lq))
            while True:
                qid = (yield result)
                if len(waiting) == 0:
                    result = try_again
                    break
                if qid in waiting:
                    waiting.remove(qid)
                    result = value, False
                else:
                    result = try_again

    def new(self, fxn: WRAPPER_FXN_T) -> int:
        qid = len(self._queue)
        self._queue.append(wrapper(fxn, self._engine, qid)())
        return qid

    def finalize(self):
        self._queue = tuple(self._queue)

    def get(self, qid: int) -> Task:
        return create_task(self._queue[qid])

    @classmethod
    @(lambda f: (lambda it, fxns: run(f(it, fxns))))
    def make(cls, it: int_iterable, fxns: Iterable[Callable[[int_iterable], _T]]) -> Tuple[_T, ...]:
        tmp = cls(it)
        qid_range = max(map(tmp.new, fxns))
        tmp.finalize()
        return tuple((await tmp.get(qid)) for qid in range(qid_range + 1))


seq_stats = namedtuple('seq_stats', ('tuple', 'mean', 'harmonic_mean', 'geometric_mean', 'median', 'median_high', 'median_low', 'mode'))


@(lambda f: (lambda xs: run(f(xs))))
async def bundle_bg(xs: int_iterable) -> seq_stats:
    tmp = BranchedGenerator(xs)
    # noinspection PyTypeChecker
    ys = seq_stats(
        tmp.new(tuple),
        tmp.new(mean),
        tmp.new(harmonic_mean),
        tmp.new(geometric_mean),
        tmp.new(median),
        tmp.new(median_high),
        tmp.new(median_low),
        tmp.new(mode)
    )
    tmp.finalize()
    return seq_stats(
        await tmp.get(ys.tuple),
        await tmp.get(ys.mean),
        await tmp.get(ys.harmonic_mean),
        await tmp.get(ys.geometric_mean),
        await tmp.get(ys.median),
        await tmp.get(ys.median_high),
        await tmp.get(ys.median_low),
        await tmp.get(ys.mode)
    )


def bundle(xs: int_iterable) -> seq_stats:
    return seq_stats(
        tuple(xs),
        mean(xs),
        harmonic_mean(xs),
        geometric_mean(xs),
        median(xs),
        median_high(xs),
        median_low(xs),
        mode(xs)
    )


def display(v: seq_stats):
    print(f'Statistics of {v.tuple}:\n'
          f'\tMean: {v.mean}\n'
          f'\tHarmonic Mean: {v.harmonic_mean}\n'
          f'\tGeometric Mean: {v.geometric_mean}\n'
          f'\tMedian: {v.median}\n'
          f'\tMedian High: {v.median_high}\n'
          f'\tMedian Low: {v.median_low}\n'
          f'\tMode: {v.mode};')


def new(length: int, inclusive_maximum: int) -> int_iterable:
    return (randint(1, inclusive_maximum) for _ in range(length))


def test1() -> int:
    sample = new(10, 1 << 65)
    struct1 = bundle_bg(sample)
    display(struct1)
    struct2 = bundle(struct1.tuple)
    display(struct2)
    matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
    display(matches)
    return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))


async def test2():
    sample = new(1000, 1 << 5)
    # noinspection PyTypeChecker
    struct1 = seq_stats(*await BranchedGenerator.make(
        sample,
        (tuple, mean, harmonic_mean, geometric_mean, median, median_high, median_low, mode)
    ))
    display(struct1)
    struct2 = bundle(struct1.tuple)
    display(struct2)
    matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
    display(matches)
    return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))


async def test3():
    pass


if __name__ == '__main__':
    exit((test1()))

The Branching Generator Module (V2) - Pastebin.com link has the most up-to-date version. I will not be updating the embedded code! If changes are made, the pastebin copy will have them.

Tests

  1. The test1() makes sure that bundle_bg() does what bundle() does. They should do the exact same thing.

  2. The test2() sees if BranchedGenarator.make() behaves like bundle_bg() and (transitively) like bundle(). The BranchedGenarator.make() is supposed to be most like magic_function().

  3. test3() has no purpose yet.

Status

The first test fails. The second test has a similar error calling BranchedGenerator.make().

[redacted]/b_gen.py:45: RuntimeWarning: coroutine 'wait' was never awaited
  wait(self._engine)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
  File "[redacted]/b_gen.py", line 173, in <module>
    exit((test1()))
  File "[redacted]/b_gen.py", line 144, in test1
    struct1 = bundle_bg(sample)
  File "[redacted]/b_gen.py", line 87, in <lambda>
    @(lambda f: (lambda xs: run(f(xs))))
  File "/usr/lib64/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib64/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "[redacted]/b_gen.py", line 103, in bundle_bg
    await tmp.get(ys.tuple),
  File "[redacted]/b_gen.py", line 31, in i
    return fxn(iter(x async for x in injector(engine, qid)))
TypeError: 'async_generator' object is not iterable
sys:1: RuntimeWarning: coroutine 'wrapper.<locals>.i' was never awaited

In all honesty, I'm new to asyncio. I don't know how to fix this.

The Question

Can somebody help me fix this?! Please? This one with asyncio should do exactly what the one with threading does -- just without the overhead.

Another pathway

Before this, I attempted a simpler implementation.

#!/usr/bin/python3
from random import randrange
from statistics import mean as st_mean, median as st_median, mode as st_mode
from typing import Any, Callable, Iterable, Tuple, TypeVar

''' https://pastebin.com/xhfT1njJ '''


class BranchedGenerator:
    _n: Iterable[int]
    _stop_value: Any

    def __init__(self, n: Iterable[int], stop: Any):
        self._n = n
        self._stop_value = stop

    @property
    def new(self):
        return


def wrapper1(f):
    new = (yield)
    # SyntaxError: 'yield' inside generator expression
    yield f((y for _ in new if (y := (yield)) or True))
    return


_T1 = TypeVar('_T1')
_T2 = TypeVar('_T2')


def wrapper2(ns: Iterable[_T1], fs: Iterable[Callable[[Iterable[_T1]], _T2]]) -> Tuple[_T2, ...]:
    def has_new():
        while new:
            yield True
        while True:
            yield False

    new = True
    xwf = tuple(map(wrapper1, fs))
    for x in xwf:
        next(x)
        x.send(has_new)
        next(x)
    for n in ns:
        for x in xwf:
            x.send(n)
    new = False
    return tuple(map(next, xwf))


def source(n: int) -> Iterable[int]:
    return (randrange(-9, 9000) for _ in range(n))


normal = (tuple, st_mean, st_median, st_mode)


def test0():
    sample = tuple(source(25))
    s_tuple, s_mean, s_median, s_mode = wrapper2(sample, normal)
    b_tuple, b_mean, b_median, b_mode = (f(s_tuple) for f in normal)
    assert all((
        s_tuple == b_tuple,
        s_mean == b_mean,
        s_median == b_median,
        s_mode == b_mode
    ))


def test1():
    sample = source(25)
    s_tuple, s_mean, s_median, s_mode = wrapper2(sample, normal)
    b_tuple, b_mean, b_median, b_mode = (f(s_tuple) for f in normal)
    print(
        'Test1:'
        '\nTuple', s_tuple, '\n', b_tuple, '\n==?', v0 := s_tuple == b_tuple,
        '\nMean', s_mean, '\n', b_mean, '\n==?', v1 := s_mean == b_mean,
        '\nMedian', s_median, '\n', b_median, '\n==?', v2 := s_median == b_median,
        '\nMode', s_mode, '\n', b_mode, '\n==?', v3 := s_mode == b_mode,
        '\nPasses', ''.join('01'[v * 1] for v in (v0, v1, v2, v3)), 'All?', all((v0, v1, v2, v3))
    )


if __name__ == '__main__':
    test0()
    test1()

The Branching Generator Module (V1) - Pastebin.com link has the update policy.

Tests

  1. Test 0 tells whether wrapper2() does what is supposed to do. That is to call all functions and return the results. No memory is saved, like first_method == True.

  2. Test 1 is simply like first_method == False. The sample is not a tuple.

Problem

Ouch! I can code, I assure you.

 File "[redacted]/branched_generator.py", line 25
    yield f((y for _ in new if (y := (yield)) or True))
            ^
SyntaxError: 'yield' inside generator expression

I freely admit it: this version is dafter. The wrapper2() is obviously most like magic_function().

Question

As this is the simpler implementation, can this wrapper2() be salvaged? If not, don't sweat it.

YoungCoder5
  • 845
  • 1
  • 7
  • 14

1 Answers1

0

If it's just the materialization of the data you are worried about, you could do

from itertools import tee
from statistics import geometric_mean, harmonic_mean, mean, median, median_high, median_low, mode
from random import randint

def magic_function(data, fxns):
    return tuple(f(d) for f, d in zip(fxns, tee(data, len(fxns))))

def new(length: int, inclusive_maximum: int) -> Iterable[int]:
    return (randint(1, inclusive_maximum) for _ in range(length))

sample = new(1000, 1 << 5)
functions = (tuple, mean, harmonic_mean, geometric_mean, median, median_high, median_low, mode)

magic_function(sample, functions)

NB tee is not thread-safe though

PS: You are right, this consumes the generator and makes n copies of all data in it.

I don't think we can salvage the async and await version in your question. The arbitrary functions in fxns will have to consume the iterators asynchronously; they have to release control flow after (roughly) each item they pop off and process. But async and await are cooperative, we can't force any given function f to await in its loop (that's why we get the TypeError). But your solution using threading does work, because at some points in their loops, threads are put to sleep pre-emptively by the VM, and in that way give opportunity for the other functions to run.

Keep in mind, there's a difference between simultaneous and concurrent. When I said a sequential roundrobin of the functions would be enough, I meant it in this way, let one of them consume an item, and then let the next one consume one. There is no need for the functions to run simultaneous. In fact, your working threading example, doesn't run anything simultaneously (on the CPython VM. IronPython and Jython may run multiple threading.Threads simultaneously, but on CPython there's only 1 running at a time)

Chris Wesseling
  • 6,226
  • 2
  • 36
  • 72
  • This stores data. The point is not to store data. – YoungCoder5 May 08 '21 at 04:02
  • It does duplicaye the iterator, however the functions still need to be called simultaneously to avoid storing data. – YoungCoder5 May 08 '21 at 04:07
  • It only stores n copies of 1 value at the time, where n is the number of functions, if you roundrobin the functions. You don't have to call the functions simultaneously, just sequentially. – Chris Wesseling May 08 '21 at 05:57
  • We can update the answer to do that. I first need to caffeinate and get to a keyboard. – Chris Wesseling May 08 '21 at 06:02
  • Note to self, don't check your SO inbox when you just wake up; it makes you overestimate the power of caffeine. – Chris Wesseling May 08 '21 at 10:50
  • I don't follow. I want each function to get the exact same sequence. Calling the functions sequencially spends the generator on the first function and the functions thereafter cannot get those same values without storing them. – YoungCoder5 May 08 '21 at 15:13
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/232120/discussion-between-youngcoder5-and-chris-wesseling). – YoungCoder5 May 08 '21 at 16:53