1

so this is my code:

import asyncio
import logging
from asyncio import AbstractEventLoop

from aio_pika import connect, IncomingMessage


def test_one(a, b):
    print("test_one", a, b)


class Consumer:
    def __init__(self, url):
        self.url = url

    async def run(self, loop: AbstractEventLoop):
        while True:
            try:
                connection = await connect(self.url, loop=loop)
                connection.add_close_callback(test_one)
                connection.add_close_callback(self.test_two)

                # Creating a channel
                channel = await connection.channel()

                # Declaring queue
                queue = await channel.declare_queue("snapshots")

                logging.info("Started listening")

                # Start listening the queue with name 'hello'
                await queue.consume(self.on_message, no_ack=True)
                break
            except:
                logging.error("Could not connect")
            finally:
                await asyncio.sleep(1)

    def on_message(self, message: IncomingMessage):
        print(message.body)

    def test_two(self, a, b):
        print("closed", a, b)

My problem is when I disconnect it only calls test_one function, but it doesn't call test_two function inside the class. I don't understand. I tried only adding the test_two function but that didn't work either. Tried removing the parameters. Same issue. I'm out of ideas. Do you know what I do wrong?

btw the self.on_message does work.

DazDylz
  • 1,028
  • 3
  • 13
  • 39
  • I'm struggling with a similar problem. If I call a class function in Main, it works. If I call a static function in a module, it works. But if I call a class function in a module, it doesn't. Did you ever find a solution to your problem? – VoteCoffee Jan 11 '21 at 13:09
  • @VoteCoffee hi there, no I didn't find a solution and gave up. Later dicided to try again with another language. – DazDylz Jan 13 '21 at 06:33
  • Maybe try lambda function? – DazDylz Jan 13 '21 at 06:34
  • It turns out that the API I was using was creating a weak reference to the callback function, and that the underlying callback function handle was being destroyed before the API tried to use it. I worked around it by storing the callback function as a self._callbackfunction variable in the class before passing it to the API. Took a lot of work to figure it out! – VoteCoffee Jan 13 '21 at 12:48

1 Answers1

1

It's possible the API is creating a weak reference to the callback function that it is being passed. Try creating a strong reference to the callback function before passing it.

self._cb_func = self.test_two
connection.add_close_callback(self._cb_func)

The full code:

import asyncio
import logging
from asyncio import AbstractEventLoop

from aio_pika import connect, IncomingMessage


def test_one(a, b):
    print("test_one", a, b)


class Consumer:
    def __init__(self, url):
        self.url = url

    async def run(self, loop: AbstractEventLoop):
        while True:
            try:
                connection = await connect(self.url, loop=loop)
                connection.add_close_callback(test_one)
                self._cb_func = self.test_two
                connection.add_close_callback(self._cb_func)

                # Creating a channel
                channel = await connection.channel()

                # Declaring queue
                queue = await channel.declare_queue("snapshots")

                logging.info("Started listening")

                # Start listening the queue with name 'hello'
                await queue.consume(self.on_message, no_ack=True)
                break
            except:
                logging.error("Could not connect")
            finally:
                await asyncio.sleep(1)

    def on_message(self, message: IncomingMessage):
        print(message.body)

    def test_two(self, a, b):
        print("closed", a, b)

If you have a lot of callback functions, then there is an answer in this question for storing them as an array: using python WeakSet to enable a callback functionality

VoteCoffee
  • 4,692
  • 1
  • 41
  • 44
  • Didn't test it, since abondend the project months ago but I trust you it's the right solution :). – DazDylz Jan 16 '21 at 10:49
  • @DazDylz No worries, it was the solution for my issue which brought me to your similar question. Hopefully it helps someone down the line! – VoteCoffee Jan 16 '21 at 13:36