4

I am not able to run this test, i always have the same error RuntimeError: Event loop is closed

What i need to add to this code?

from motor.motor_asyncio import AsyncIOMotorClient
import pytest
import asyncio

client = AsyncIOMotorClient("mongodb://mongo:mongo@192.168.0.11:27017/admin?retryWrites=false")
db = client['app']
aux = db['users']

async def create_user_db(a: dict):
    x = await aux.insert_one(a)
    return x

@pytest.mark.asyncio
async def test_create():
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(form)
    assert res != None

This is the error

enter image description here

HPringles
  • 1,042
  • 6
  • 15
Cristoff
  • 149
  • 6

1 Answers1

2

In your example, your opening the database during "import" time, but we still have no eventloop. The event loop is created when the test case runs.

You could define your database as fixture and provide it to the testing functions, e.g.:

@pytest.fixture
def client():
    return AsyncIOMotorClient("mongodb://localhost:27017/")


@pytest.fixture
def db(client):
    return client['test']


@pytest.fixture
def collection(db):
    return db['test']


async def create_user_db(collection, a: dict):
    x = await collection.insert_one(a)
    return x



@pytest.mark.asyncio
async def test_create(collection):
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(collection, form)
    assert res != None
erny
  • 1,296
  • 1
  • 13
  • 28