I am trying to write a unit testcase using mock and pytest-asyncio. I have a normal function launching an asyncio coroutine using asyncio.run
. [Using python3.7]
import asyncio
async def sample_async(arg2):
# do something
proc = await asyncio.create_subprocess_shell(arg2)
# some more asyncio calls
rc = proc.returncode
return rc
def launcher(arg1, arg2):
if arg1 == "no_asyncio":
# do something
print("No asyncio")
return 0
else:
return_code = asyncio.run(sample_async(arg2))
# do something
return return_code
I was able to write unittest for the asyncio function sample_async
but not for launcher
. Here is what I have tried:
class AsyncMock(MagicMock):
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)
@patch("asyncio.create_subprocess_shell", new_callable=AsyncMock)
def test_launcher(async_shell):
arg1 = "async"
arg2 = "/bin/bash ls"
sample.launcher(arg1, arg2)
async_shell.assert_called_once()
When I try to run the pytest, I keep getting RuntimeError: There is no current event loop in thread 'MainThread'.
I can't use @pytest.mark.asyncio
for this test as the function launcher
is not an asyncio coroutine. What am I doing wrong here?