7

I want to test some function, that work with asyncpg. If I run one test at a time, it works fine. But if I run several tests at a time, all tests except the first one crash with the error asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress.

Tests:

@pytest.mark.asyncio
async def test_project_connection(superuser_id, project_id):
    data = element_data_random(project_id)

    element_id = (await resolve_element_create(data=data, user_id=superuser_id))["id"]
    project_elements = (await db_projects_element_ids_get([project_id]))[project_id]

    assert element_id in project_elements


@pytest.mark.asyncio
async def test_project_does_not_exist(superuser_id):
    data = element_data_random(str(uuid.uuid4()))

    with pytest.raises(ObjectWithIdDoesNotExistError):
        await resolve_element_create(data=data, user_id=superuser_id)

All functions for work with db use pool look like:

async def <some_db_func>(*args):
    pool = await get_pool()

    await pool.execute(...) # or fetch/fetchrow/fetchval

How I get the pool:

db_pool = None


async def get_pool():
    global db_pool

    async def init(con):
        await con.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')
        await con.set_type_codec('json', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')

    if not db_pool:
        dockerfiles_dir = os.path.join(src_dir, 'dockerfiles')
        env_path = os.path.join(dockerfiles_dir, 'dev.env')

        try:
            # When code and DB inside docker containers
            host = 'postgres-docker'
            socket.gethostbyname(host)
        except socket.error:
            # When code on localhost, but DB inside docker container
            host = 'localhost'

        load_dotenv(dotenv_path=env_path)

        db_pool = await asyncpg.create_pool(
            database=os.getenv("POSTGRES_DBNAME"),
            user=os.getenv("POSTGRES_USER"),
            password=os.getenv("POSTGRES_PASSWORD"),
            host=host,
            init=init
        )  

    return db_pool

As far as I understand under the hood, asynсpg creates a new connection and runs the request inside that connection if you run the request through pool. Which makes it clear that each request should have its own connection. However, this error occurs, which is caused when one connection tries to handle two requests at the same time

Prosto_Oleg
  • 322
  • 3
  • 13
  • 1
    It seems they must run sequentially instead of parallelly – Adelin Apr 22 '22 at 08:13
  • @Adelin Yeah, but I need mark test like `async def test_...` because I need to use await in test, from this I need to use `@pytest.mark.asyncio` if this is not done, the tests are skipped with warning – Prosto_Oleg Apr 22 '22 at 08:16

2 Answers2

3

The problem happens because each test function create it's own event-loop and it make asyncpg-pool confused with what event-loop is for it.

You can change event-loop scope to "session" from "function" by below on conftest.py.

You don't need to make it sequentially.


import asyncio
import pytest

@pytest.yield_fixture(scope="session")
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

rumbarum
  • 803
  • 7
  • 7
2

Okay, thanks to @Adelin I realized that I need to run each asynchronous test synchronously. I I'm new to asyncio so I didn't understand it right away and found a solution.

It was:

@pytest.mark.asyncio
async def test_...(*args):
    result = await <some_async_func>

    assert result == excepted_result

It become:

def test_...(*args):
    async def inner()
        result = await <some_async_func>

        assert result == excepted_result

    asyncio.get_event_loop().run_until_complete(inner())
Prosto_Oleg
  • 322
  • 3
  • 13