I've got a small flask server which uses click to define a few commands run via cron. My server connects to Rabbitmq via asyncio & asynqp:
class RabbitMQConnection():
def __init__(self):
self.connection = None
self.channel = None
self.exchange = None
@asyncio.coroutine
def connect(self):
self.connection = yield from asynqp.connect(
'rabbitmq-host',
5672,
username=RABBITMQ_USERNAME,
password=RABBITMQ_PASSWORD)
class MessageProcessor(Thread):
def run(self):
loop = asyncio.new_event_loop()
loop.create_task(self._run())
loop.run_forever()
@asyncio.coroutine
def _run(self):
rabbit = RabbitMQConnection()
yield from rabbit.connect()
def init_app(app):
thread = MessageProcessor()
thread.daemon = True
thread.start()
When I run a click command, it loads the flask app (which I want as it includes my DB models) but also starts another connection to rabbitmq. I'd like to be able to check in the above init_app
function if I'm running in the context of a click command or just as a flask server.
Here is an example of a click command definition:
@click.command('my-function')
@with_appcontext
def my_function():
click.echo('this was fun')
def init_app(app):
app.cli.add_command(my_function)