2

Let's say I have a web app driven by Uvicorn server, the app implements GraphQL API with a mutation that starts long calculations on the server-side and a query endpoint that checks the status of the server. Let's say we want to know how many tasks are running in the background. I have a simplified code, which does not work:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications

query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()

FIFO = []

async def long_task():
    print('Starting long task...')
    global FIFO
    FIFO = [1, *FIFO]
    # mock long calc with 5sec sleep
    time.sleep(5)
    FIFO.pop()
    print('Long task finished!')


@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
    print('Start to resolve long task...')
    asyncio.create_task(long_task())
    print('Resolve finished!')
    return {}

@query_t.field('ping')
async def resolve_ping(_parent, info):
    return f'FIFO has {len(FIFO)} elements'


def main():
    schema_str = ariadne.gql('''
    type Mutation{
        longTask: longTaskResponse
    }
    type longTaskResponse {
        message: String
    }
    type Query {
        ping: String
    }
    ''')
    schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
    gql_app = ariadne.asgi.GraphQL(schema)
    app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
    uvicorn.run(app,
                host='0.0.0.0',
                port=9002,
                log_level='error')


if __name__ == '__main__':
    main()

After running

$ python main.py

I send a mutation in the GraphQL GUI in the first tab:

mutation longTaskQueue{
  longTask {
    message
  }
}

In the second tab, I try to retrieve the length of the FIFO:

query ping {
  ping
}

It seems that it's possible to run 2 long_task, but ping is waiting until all long_task will be finished. My general question is how to run heavy code in the background and do not block GQL API?

Maksym Titov
  • 45
  • 1
  • 7
  • When working with async you need to use `asyncio.sleep` instead of `time.sleep` because otherwise it will block the whole process – retnikt Mar 04 '21 at 10:30
  • How can I actually put `long_task()` into another process or put it to sleep from time to time so that API would be available for the client (despite he started a long task)? – Maksym Titov Mar 04 '21 at 11:14

1 Answers1

1

After many attempts I made it, now I can put many tasks in the background and track their amount (API is not freezing on one long task). What is happening is the blocking computations are run in a pool:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()

FIFO = []

async def long_task():
    print('Starting long task...')
    global FIFO
    FIFO = [1, *FIFO]
    # mock long calc with 5sec sleep
    time.sleep(5)
    FIFO.pop()
    print('Long task finished!')


def run(corofn, *args):
    loop = asyncio.new_event_loop()
    try:
        coro = corofn(*args)
        asyncio.set_event_loop(loop)
        return loop.run_until_complete(coro)
    finally:
        loop.close()


@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(max_workers=5)
    loop.set_default_executor(ProcessPoolExecutor())
    print('Start to resolve long task...')
    loop.run_in_executor(executor, run, long_task)
    print('Resolve finished!')
    return {}

@query_t.field('ping')
async def resolve_ping(_parent, info):
    return f'FIFO has {len(FIFO)} elements'


def main():
    schema_str = ariadne.gql('''
    type Mutation{
        longTask: longTaskResponse
    }
    type longTaskResponse {
        message: String
    }
    type Query {
        ping: String
    }
    ''')
    schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
    gql_app = ariadne.asgi.GraphQL(schema)
    app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
    uvicorn.run(app,
                host='0.0.0.0',
                port=9002,
                log_level='error')


if __name__ == '__main__':
    main()

The solution inspired by this answer.

Maksym Titov
  • 45
  • 1
  • 7