13

Hi I'm using AsyncIOMotorClient for asynchronous db calls to mongoDb. Below is my code.

xyz.py
async def insertMany(self,collection_name,documents_to_insert):
    try:
        collection=self.database[collection_name]
        document_inserted = await collection.insert_many(documents_to_insert)
        return document_inserted
    except Exception:
        raise

def insertManyFn(self,collection_name,documents_to_insert):
    try:
        loop=asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop1=asyncio.get_event_loop()
        inserted_documents_count = loop1.run_until_complete(self.insertMany(collection_name, documents_to_insert))
        if inserted_documents_count==len(documents_to_insert):
            document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
            loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
    except Exception:
        raise

xyz1.py
t=Timer(10,xyz.insertManyFn,\
                (collection_name,documents_to_insert))
t.start()   

While running this I'm getting an exception

RuntimeError: Task <Task pending coro=<xyz.insertMany() running at <my workspace location>/xyz.py:144> cb=[_run_until_complete_cb() at /usr/lib64/python3.5/asyncio/base_events.py:164]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/lib64/python3.5/asyncio/futures.py:431]> attached to a different loop

In the above program insertManyFn will be called after 10sec and do the insert operation. But when it make the first call to insertMany I'm getting an exception.

A. Jesse Jiryu Davis
  • 23,641
  • 4
  • 57
  • 70
AbhishekSinghNegi
  • 183
  • 1
  • 1
  • 10

4 Answers4

22

I still want my MotorClient to be at the top level of the module, so this is what I do: I patch MotorClient.get_io_loop to always return the current loop.

import asyncio
import motor.core

from motor.motor_asyncio import (
    AsyncIOMotorClient as MotorClient,
)

# MongoDB client
client = MotorClient('mongodb://localhost:27017/test')
client.get_io_loop = asyncio.get_running_loop

# The current database ("test")
db = client.get_default_database()


# async context
async def main():
    posts = db.posts
    await posts.insert_one({'title': 'great success!')


# Run main()
asyncio.run(main())
kolypto
  • 31,774
  • 17
  • 105
  • 99
  • 2
    Thanks, I was getting the same error as op and adding `client.get_io_loop = asyncio.get_running_loop` helped but I don't understand why is this so? – y_159 Sep 12 '21 at 12:24
  • 2
    @y_159 apparently, an application can initialize several loops. However, if your running loop is different from the one motor has created, you're in trouble :) This hack makes sure that everybody uses the same loop – kolypto Sep 17 '21 at 11:07
  • Wow great patch, thanks! question is why motor starts another loop inside docker? As I get this exception with docker but not without it. – Mauricio Maroto Sep 27 '21 at 21:14
  • 1
    This line did the magic for me: `client.get_io_loop = asyncio.get_running_loop`. Exactly what I was missing! – Agey Jun 01 '22 at 11:23
8

According to the documentation, AsyncIOMotorClient should be passed an ioloop if you don't use the default one. Try creating the client after you create your event loop:

loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = AsyncIOMotorClient(io_loop=loop)
Udi
  • 29,222
  • 9
  • 96
  • 129
1

I've modified the code and it's working.

def insertManyFn(self,loop,collection_name,documents_to_insert):
    try:
        inserted_documents_count = loop.run_until_complete(self.insertMany(event_loop,collection_name, documents_to_insert))
        if len(inserted_documents_count)==len(documents_to_insert):
            document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
            loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
    except Exception:
        raise

loop=asyncio.get_event_loop()   
t=Timer(10,self.xyz.insertManyFn,(loop,collection_name,documents_to_insert))
t.start()

Explanation- I'm using python threading timer, which creates own thread to execute a function after a certain time. So, inside this thread I was getting event loop which should not be the correct approach, it should be first getting the event loop and create a timer thread in it. I guess this is the only reason.

AbhishekSinghNegi
  • 183
  • 1
  • 1
  • 10
  • see also: http://stackoverflow.com/questions/41399229/using-threading-timer-with-asycnio – Udi Jan 11 '17 at 13:38
  • @Udi Thanks for the help. I want to know one thing, my application support async CRUD operations, so for each operation I'm getting an event loop. Is this approach ok? – AbhishekSinghNegi Jan 13 '17 at 05:23
  • 1
    No - your whole application should be running inside one event loop. – Udi Jan 13 '17 at 05:46
  • your reference to use asyncio.ensure_future() instead of python timer works fine when we want to return(say op. id) after the whole operation(in my case insertion) is completed. But what to do when we have to return first and the insertion will happen in background. – AbhishekSinghNegi Jan 13 '17 at 08:29
  • `ensure_future` returns immediately. – Udi Jan 13 '17 at 18:57
  • I agree but control only comes to calling api after the execution of inserts when I use ensure_future. Here is the program cycle, API -> python method, this method returns acknowledgment and plan for async operation(using ensure_future) -> async insertion method, later after insertion I'm getting acknowledgement. I want to send acknowledgement to user before insertion. – AbhishekSinghNegi Jan 14 '17 at 07:12
1

The answer from @kolypto saves my night, but if you want to patch some framework or ORM code over Motor client, you need to patch the client, in my case I use fastapi_contrib MongoDB models on pytest and I have to patch the AgnosticClient class.

import asyncio

from motor.core import AgnosticClient

AgnosticClient.get_io_loop = asyncio.get_running_loop
Felipe Buccioni
  • 19,109
  • 2
  • 28
  • 28