42

The Scipy minimization function (just to use as an example), has the option of adding a callback function at each step. So I can do something like,

def my_callback(x):
    print x
scipy.optimize.fmin(func, x0, callback=my_callback)

Is there a way to use the callback function to create a generator version of fmin, so that I could do,

for x in my_fmin(func,x0):
    print x

It seems like it might be possible with some combination of yields and sends, but I can quite think of anything.

Eric O. Lebigot
  • 91,433
  • 48
  • 218
  • 260
marius
  • 1,352
  • 1
  • 13
  • 29
  • I think you'll have to use `multithreading` for this, as you'll have to have an output queue and a constantly yielding generator as well running at the same time. – Blender Apr 01 '12 at 22:05
  • I don't think it's possible. Somewhere in `fmin`, the call to `my_callback` expects a simple function that returns a value. So anything you send has to respect that interface. Unless I'm missing something, the opportunity to turn it into a generator lies in the code that calls the function. – aaronasterling Apr 01 '12 at 22:07
  • 1
    This makes me think of Channels in Stackless Python and Go. – lunixbochs Apr 01 '12 at 22:14
  • 2
    This makes me think of call/cc. – machine yearning Apr 01 '12 at 22:20
  • Of course, in (almost) any specific case you could also copy [the source](https://github.com/scipy/scipy/blob/master/scipy/optimize/optimize.py#L210) and change [the line that does the callback](https://github.com/scipy/scipy/blob/master/scipy/optimize/optimize.py#L446) to a `yield`. – Danica Apr 01 '12 at 22:33
  • by "the function" in my previous response, I of course mean `my_callback` – aaronasterling Apr 01 '12 at 22:36
  • @Dougal: that breaks notions of code reuse nastily. That, naturally, makes it Not a Good Idea. Now, if you resort instead to bytecode modification at runtime... ;-) – Chris Morgan Apr 02 '12 at 01:21
  • @ChrisMorgan Obviously I'm not advocating that solution in general. But if you really need it and only need it for one function, it might make more sense than resorting to multithreading. – Danica Apr 02 '12 at 02:19

7 Answers7

18

As pointed in the comments, you could do it in a new thread, using Queue. The drawback is that you'd still need some way to access the final result (what fmin returns at the end). My example below uses an optional callback to do something with it (another option would be to just yield it also, though your calling code would have to differentiate between iteration results and final results):

from thread import start_new_thread
from Queue import Queue

def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):

    q = Queue() # fmin produces, the generator consumes
    job_done = object() # signals the processing is done

    # Producer
    def my_callback(x):
        q.put(x)
    def task():
        ret = scipy.optimize.fmin(func,x0,callback=my_callback)
        q.put(job_done)
        end_callback(ret) # "Returns" the result of the main call

    # Starts fmin in a new thread
    start_new_thread(task,())

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item

Update: to block the execution of the next iteration until the consumer has finished processing the last one, it's also necessary to use task_done and join.

    # Producer
    def my_callback(x):
        q.put(x)
        q.join() # Blocks until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item
        q.task_done() # Unblocks the producer, so a new iteration can start

Note that maxsize=1 is not necessary, since no new item will be added to the queue until the last one is consumed.

Update 2: Also note that, unless all items are eventually retrieved by this generator, the created thread will deadlock (it will block forever and its resources will never be released). The producer is waiting on the queue, and since it stores a reference to that queue, it will never be reclaimed by the gc even if the consumer is. The queue will then become unreachable, so nobody will be able to release the lock.

A clean solution for that is unknown, if possible at all (since it would depend on the particular function used in the place of fmin). A workaround could be made using timeout, having the producer raises an exception if put blocks for too long:

    q = Queue(maxsize=1)

    # Producer
    def my_callback(x):
        q.put(x)
        q.put("dummy",True,timeout) # Blocks until the first result is retrieved
        q.join() # Blocks again until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        q.task_done()                   # (one "task_done" per "get")
        if next_item is job_done:
            break
        yield next_item
        q.get() # Retrieves the "dummy" object (must be after yield)
        q.task_done() # Unblocks the producer, so a new iteration can start
mgibsonbr
  • 21,755
  • 7
  • 70
  • 112
  • Same as @Winston Ewert's answer. This evaluates callback until the fmin function returns. It will force the evaluation of the callback for every element before the generator starts to yield. – brice Apr 01 '12 at 22:37
  • 2
    Very nice. To resolve @brice's point, use `Queue(maxsize=1)` and `q.put(x,block=True)`. I can't see any issues with it otherwise. – marius Apr 01 '12 at 22:40
  • Beat me to it, @marius. See my answer. – brice Apr 01 '12 at 22:43
  • Sorry, but I don't understand why it would force the evaluation of every element. It could add two or more results to the queue before one is consumed, sure, but why would it prevent them from being consumed? (Your's and marius' suggestions are the right way to ensure that no new value is produced before the last one is conumed, with that I agree, but is it really necessary?) – mgibsonbr Apr 01 '12 at 22:49
  • @mgibsonbr It's only necessary in that that's what my question was ;) I wanted something that exactly mimicks the "callback" functionality, i.e. the evaluation goes fmin step -> callback code -> fmin step -> callback code -> etc... but is syntactically a generator expression. Without the `maxsize=1` the interspersion of fmin steps and callback code is random. – marius Apr 01 '12 at 22:57
  • 1
    In that case, `maxsize=1` is not enough, since fmin will continue processing the next iteration while the first item is consumed (since it didn't block in the first put; it will only block **after** the second iteration is finished and it's trying to put the results in the queue). See my updated answer. – mgibsonbr Apr 02 '12 at 01:16
  • I would want `next` renamed; `next` is a builtin for iteration. – Chris Morgan Apr 02 '12 at 01:24
  • Done, thanks for the tip! Also, updated the answer again, this time concerning the potential deadlock. – mgibsonbr Apr 02 '12 at 01:46
  • 1
    At least with Python 2.6, a `q.task_done()` call needs to be made for both calls to `q.get`, not just after the dummy get. – Mr Fooz May 29 '13 at 17:43
  • _"A clean solution for that is unknown"_ - does [my answer](https://stackoverflow.com/a/57236093/102441) solve that problem? – Eric Jul 27 '19 at 21:15
15

Generator as coroutine (no threading)

Let's have FakeFtp with retrbinary function using callback being called with each successful read of chunk of data:

class FakeFtp(object):
    def __init__(self):
        self.data = iter(["aaa", "bbb", "ccc", "ddd"])

    def login(self, user, password):
        self.user = user
        self.password = password

    def retrbinary(self, cmd, cb):
        for chunk in self.data:
            cb(chunk)

Using simple callback function has disadvantage, that it is called repeatedly and the callback function cannot easily keep context between calls.

Following code defines process_chunks generator, which will be able receiving chunks of data one by one and processing them. In contrast to simple callback, here we are able to keep all the processing within one function without losing context.

from contextlib import closing
from itertools import count


def main():
    processed = []

    def process_chunks():
        for i in count():
            try:
                # (repeatedly) get the chunk to process
                chunk = yield
            except GeneratorExit:
                # finish_up
                print("Finishing up.")
                return
            else:
                # Here process the chunk as you like
                print("inside coroutine, processing chunk:", i, chunk)
                product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
                processed.append(product)

    with closing(process_chunks()) as coroutine:
        # Get the coroutine to the first yield
        coroutine.next()
        ftp = FakeFtp()
        # next line repeatedly calls `coroutine.send(data)`
        ftp.retrbinary("RETR binary", cb=coroutine.send)
        # each callback "jumps" to `yield` line in `process_chunks`

    print("processed result", processed)
    print("DONE")

To see the code in action, put the FakeFtp class, the code shown above and following line:

main()

into one file and call it:

$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE

How it works

processed = [] is here just to show, the generator process_chunks shall have no problems to cooperate with its external context. All is wrapped into def main(): to prove, there is no need to use global variables.

def process_chunks() is the core of the solution. It might have one shot input parameters (not used here), but main point, where it receives input is each yield line returning what anyone sends via .send(data) into instance of this generator. One can coroutine.send(chunk) but in this example it is done via callback refering to this function callback.send.

Note, that in real solution there is no problem to have multiple yields in the code, they are processed one by one. This might be used e.g. to read (and ignore) header of CSV file and then continue processing records with data.

We could instantiate and use the generator as follows:

coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()

ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`

# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()

Real code is using contextlib closing context manager to ensure, the coroutine.close() is always called.

Conclusions

This solution is not providing sort of iterator to consume data from in traditional style "from outside". On the other hand, we are able to:

  • use the generator "from inside"
  • keep all iterative processing within one function without being interrupted between callbacks
  • optionally use external context
  • provide usable results to outside
  • all this can be done without using threading

Credits: The solution is heavily inspired by SO answer Python FTP “chunk” iterator (without loading entire file into memory) written by user2357112

Tommy
  • 12,588
  • 14
  • 59
  • 110
Jan Vlcinsky
  • 42,725
  • 12
  • 101
  • 98
  • Great answer, thanks. If you explicitly defined a context manager you could call coroutine.next() in it, which would be worth it, right? – jwg Feb 16 '18 at 21:47
  • 1
    This post is very inspiring for the usage of coroutine. But what puzzling me is that dose the function `main` has any different with the following one? `def main(): processed = []; ftp.retrbinary("RETR binary", cb=processed.append); return processed` I don't know if I have misunderstood your answer, but I think the key point of the question is that "the function should be able to process infinite times of callback without exploding the memory, just like a stream or pipe". I think that is why we want to use some `yield`, but obviously, the list `processed` destroyed the plan... – henry zhu Aug 14 '19 at 16:33
  • @henryzhu your shortened `main` would work, but not as an example of using generator. The `processed` list is just to prove what we have processed, can be replaced by writing the data to a file or other stream and than it would handle infinite size of items/data. The question asked to rewrite callback to generator so I did it and kept the rest short (thus using `processed` list, not output stream). – Jan Vlcinsky Aug 16 '19 at 07:48
  • 2
    @JanVlcinsky Oh, yes, I see. So I think the purpose of this answer is mainly for the usage of generator, not for responding the questioner's asking: `for x in my_fmin(func,x0): print x`. After all, if we write the data of `processed` list to a file or other stream, we can't iterate it through `for` loop as shown above. Even though, this is still a great answer. – henry zhu Aug 16 '19 at 08:09
7

Concept Use a blocking queue with maxsize=1 and a producer/consumer model.

The callback produces, then the next call to the callback will block on the full queue.

The consumer then yields the value from the queue, tries to get another value, and blocks on read.

The producer is the allowed to push to the queue, rinse and repeat.

Usage:

def dummy(func, arg, callback=None):
  for i in range(100):
    callback(func(arg+i))

# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
  print(i)

# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
   print(i)

Can be used as expected for an iterator:

for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
  print(i)

Iteratorize class:

from thread import start_new_thread
from Queue import Queue

class Iteratorize:
  """ 
  Transforms a function that takes a callback 
  into a lazy iterator (generator).
  """
  def __init__(self, func, ifunc, arg, callback=None):
    self.mfunc=func
    self.ifunc=ifunc
    self.c_callback=callback
    self.q = Queue(maxsize=1)
    self.stored_arg=arg
    self.sentinel = object()

    def _callback(val):
      self.q.put(val)

    def gentask():
      ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
      self.q.put(self.sentinel)
      if self.c_callback:
        self.c_callback(ret)

    start_new_thread(gentask, ())

  def __iter__(self):
    return self

  def next(self):
    obj = self.q.get(True,None)
    if obj is self.sentinel:
     raise StopIteration 
    else:
      return obj

Can probably do with some cleaning up to accept *args and **kwargs for the function being wrapped and/or the final result callback.

Frits
  • 7,341
  • 10
  • 42
  • 60
brice
  • 24,329
  • 7
  • 79
  • 95
  • 1
    +1 for generalizing the code for any function, but for the sake of completeness, please see my updated answer. `maxsize=1` is not enough, it's better to use `Queue.join` and `Queue.task_done` if you want to block the producer until the consumer was done with it. (and if you **dont't** want that, what's the point of the maxsize anyway?) And I reiterate my comment on Winston Ewert's answer: there's no way to cleanly exit the function - the `take` example would block the thread forever, never releasing the resources associated with it. For that problem, unfortunatly, I know no easy solution. – mgibsonbr Apr 02 '12 at 01:31
  • You're right! It's actually more correct to use Queue.join too! It will prevent the callback from being called a second time before blocking, which leads to the correct behaviour when the underlying function has side effects. +1 nice catch. – brice Apr 02 '12 at 07:25
3

How about

data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
    print line

If not, what exactly do you want to do with the generator's data?

Winston Ewert
  • 44,070
  • 10
  • 68
  • 83
  • 1
    I think he asked the question as a general point: how do you convert a callbacks into generators, rather than asking for the particular case. – brice Apr 01 '12 at 22:22
  • 4
    This is right, but I should have been more clear. The callback version evaluates the callback at each step (which is what I'd like), whereas this does the entire minimization, _then_ calls the callback code on each step. – marius Apr 01 '12 at 22:29
  • @brice the main problem I see of creating an infinite generator from an arbitrary function is how to signal it when you're done with the generator, to stop its execution. In my [answer's](http://stackoverflow.com/a/9968886/520779) example, an option would be to make the queue size 1 and add a timeout to the producer, so it blocks when the consumer is not requesting new values and, after some time, is "killed" by the exception the queue raises. But for a clean exit, you'd need either specific information about that particular function, or that it already has means to interface with generators. – mgibsonbr Apr 01 '12 at 22:44
1

Solution to handle non-blocking callbacks

The solution using threading and queue is pretty good, of high-performance and cross-platform, probably the best one.

Here I provide this not-too-bad solution, which is mainly for handling non-blocking callbacks, e.g. called from the parent function through threading.Thread(target=callback).start(), or other non-blocking ways.

import pickle
import select
import subprocess

def my_fmin(func, x0):
    # open a process to use as a pipeline
    proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    def my_callback(x):
        # x might be any object, not only str, so we use pickle to dump it
        proc.stdin.write(pickle.dumps(x).replace(b'\n', b'\\n') + b'\n')
        proc.stdin.flush()

    from scipy import optimize
    optimize.fmin(func, x0, callback=my_callback)

    # this is meant to handle non-blocking callbacks, e.g. called somewhere 
    # through `threading.Thread(target=callback).start()`
    while select.select([proc.stdout], [], [], 0)[0]:
        yield pickle.loads(proc.stdout.readline()[:-1].replace(b'\\n', b'\n'))

    # close the process
    proc.communicate()

Then you can use the function like this:

# unfortunately, `scipy.optimize.fmin`'s callback is blocking.
# so this example is just for showing how-to.
for x in my_fmin(lambda x: x**2, 3):
    print(x)

Although This solution seems quite simple and readable, it's not as high-performance as the threading and queue solution, because:

  • Processes are much heavier than threadings.
  • Passing data through pipe instead of memory is much slower.

Besides, it doesn't work on Windows, because the select module on Windows can only handle sockets, not pipes and other file descriptors.

henry zhu
  • 561
  • 4
  • 6
  • _"Solution using [the python standard library only] is good but not pythonic"_ - citation needed. Your solution only works on posix systems with `cat`, and processes are often more expensive than threads. – Eric Jul 27 '19 at 21:06
  • @Eric Thanks for your corrections, it's very helpful. I have posted my corrected answer, which has clearified the problems. – henry zhu Aug 16 '19 at 11:48
1

A variant of Frits' answer, that:

  • Supports send to choose a return value for the callback
  • Supports throw to choose an exception for the callback
  • Supports close to gracefully shut down
  • Does not compute a queue item until it is requested

The complete code with tests can be found on github

import queue
import threading
import collections.abc

class generator_from_callback(collections.abc.Generator):
    def __init__(self, expr):
        """
        expr: a function that takes a callback
        """ 
        self._expr = expr
        self._done = False
        self._ready_queue = queue.Queue(1)
        self._done_queue = queue.Queue(1)
        self._done_holder = [False]

        # local to avoid reference cycles
        ready_queue = self._ready_queue
        done_queue = self._done_queue
        done_holder = self._done_holder

        def callback(value):
            done_queue.put((False, value))
            cmd, *args = ready_queue.get()
            if cmd == 'close':
                raise GeneratorExit
            elif cmd == 'send':
                return args[0]
            elif cmd == 'throw':
                raise args[0]

        def thread_func():
            try:
                cmd, *args = ready_queue.get()
                if cmd == 'close':
                    raise GeneratorExit
                elif cmd == 'send':
                    if args[0] is not None:
                        raise TypeError("can't send non-None value to a just-started generator")
                elif cmd == 'throw':
                    raise args[0]
                ret = expr(callback)
                raise StopIteration(ret)
            except BaseException as e:
                done_holder[0] = True
                done_queue.put((True, e))
        self._thread = threading.Thread(target=thread_func)
        self._thread.start()

    def __next__(self):
        return self.send(None)

    def send(self, value):
        if self._done_holder[0]:
            raise StopIteration
        self._ready_queue.put(('send', value))
        is_exception, val = self._done_queue.get()
        if is_exception:
            raise val
        else:
            return val

    def throw(self, exc):
        if self._done_holder[0]:
            raise StopIteration
        self._ready_queue.put(('throw', exc))
        is_exception, val = self._done_queue.get()
        if is_exception:
            raise val
        else:
            return val

    def close(self):
        if not self._done_holder[0]:
            self._ready_queue.put(('close',))
        self._thread.join()

    def __del__(self):
        self.close()

Which works as:

In [3]: def callback(f):
   ...:     ret = f(1)
   ...:     print("gave 1, got {}".format(ret))
   ...:     f(2)
   ...:     print("gave 2")
   ...:     f(3)
   ...:

In [4]: i = generator_from_callback(callback)

In [5]: next(i)
Out[5]: 1

In [6]: i.send(4)
gave 1, got 4
Out[6]: 2

In [7]: next(i)
gave 2, got None
Out[7]: 3

In [8]: next(i)
StopIteration

For scipy.optimize.fmin, you would use generator_from_callback(lambda c: scipy.optimize.fmin(func, x0, callback=c))

Eric
  • 95,302
  • 53
  • 242
  • 374
0

For a super simple approach...

def callback_to_generator():
    data = []
    method_with_callback(blah, foo, callback=data.append)
    for item in data:
        yield item
  • Yes, this isn't good for large data
  • Yes, this blocks on all items being processed first
  • But it still might be useful for some use cases :)

Also thanks to @winston-ewert as this is just a small variant on his answer :)

Brian Wylie
  • 2,347
  • 28
  • 29