11

I have a project that uses aiohttp and aiobotocore to work with resources in AWS. I am trying to test class that works with AWS S3 and I am using moto to mock AWS. Mocking works just fine with examples that use synchronous code (example from moto docs)

import boto3
from moto import mock_s3


class MyModel(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def save(self):
        s3 = boto3.client('s3', region_name='us-east-1')
        s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)


def test_my_model_save():
    with mock_s3():
        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='mybucket')

        model_instance = MyModel('steve', 'is awesome')
        model_instance.save()
        body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

        assert body == 'is awesome'

However, after rewriting this to use aiobotocore mocking does not work - it connects to real AWS S3 in my example.

import aiobotocore
import asyncio

import boto3
from moto import mock_s3


class MyModel(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    async def save(self, loop):
        session = aiobotocore.get_session(loop=loop)
        s3 = session.create_client('s3', region_name='us-east-1')
        await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)


def test_my_model_save():
    with mock_s3():
        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='mybucket')
        loop = asyncio.get_event_loop()

        model_instance = MyModel('steve', 'is awesome')
        loop.run_until_complete(model_instance.save(loop=loop))
        body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

        assert body == 'is awesome'

So my assumption here is that moto does not work properly with aiobotocore. How can I effectively mock AWS resources if my source code looks like in the second example?

Andrew Svetlov
  • 16,730
  • 8
  • 66
  • 69
Belerafon
  • 488
  • 4
  • 13

6 Answers6

12

Mocks from moto don't work because they use a synchronous API. However, you can start moto server and configure aiobotocore to connect to this test server. Take a look on aiobotocore tests for inspiration.

Alex Waygood
  • 6,304
  • 3
  • 24
  • 46
Andrew Svetlov
  • 16,730
  • 8
  • 66
  • 69
3

Using the stubber from AWS should do the trick. Here is how I did inside a tornado app for the aws read operation:

import aiobotocore
from botocore.stub import Stubber
from tornado.testing import AsyncTestCase
from aiobotocore.response import StreamingBody


class RawStream(io.BytesIO):

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

    async def read(self, n):
        return super().read(n)

class S3TestCase(AsyncTestCase):

    def setUp(self):
        super().setUp()
        session = aiobotocore.get_session()
        self.client = session.create_client("s3", region_name="AWS_S3_REGION",
                                    aws_secret_access_key="AWS_SECRET_ACCESS_KEY",
                                    aws_access_key_id="AWS_ACCESS_KEY_ID")

    @tornado.testing.gen_test
    def test_read(self):
        stubber = Stubber(self.client)
        stubber.add_response("get_object",
                          {"Body": StreamingBody(raw_stream=RawStream(self.binary_content), content_length=128),
                          "ContentLength": 128},
                         expected_params={"Bucket": "AWS_S3_BUCKET",
                                          "Key": "filename"})
        stubber.activate()

        response = await client.get_object(Bucket="AWS_S3_BUCKET", Key="filename")

Should be similar for the write operation. Hope this will guide you in the right direction.

For more info about the stubber: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/stubber.html

3

I think Sebastian Brestins answer should be the accepted one. I'm going to post this new answer as some things changed since it was posted, e.g. python 3.8 now supports async test cases and aioboto3 clients are now context managers.

A minimal example using python 3.8 would look like this:

from unittest import IsolatedAsyncioTestCase

import aioboto3
from botocore.stub import Stubber

class Test(IsolatedAsyncioTestCase):

    async def asyncSetUp(self):
        self._s3_client = await aioboto3.client('s3').__aenter__()
        self._s3_stub = Stubber(self._s3_client)

    async def asyncTearDown(self):
        await self._s3_client.__aexit__(None, None, None)

    async def test_case(self):
        self._s3_stub.add_response(
            "get_object",
            {"Body": "content"},
            expected_params={"Bucket": "AWS_S3_BUCKET", "Key": "filename"}
        )
        self._s3_stub.activate()

        response = await self._s3_client.get_object(Bucket="AWS_S3_BUCKET", Key="filename")

        self.assertEquals(response, "content")

r-or
  • 46
  • 1
2

Here's mock_server.py from aiobotocore without pytest:

# Initially from https://raw.githubusercontent.com/aio-libs/aiobotocore/master/tests/mock_server.py

import shutil
import signal
import subprocess as sp
import sys
import time
import requests


_proxy_bypass = {
  "http": None,
  "https": None,
}


def start_service(service_name, host, port):
    moto_svr_path = shutil.which("moto_server")
    args = [sys.executable, moto_svr_path, service_name, "-H", host,
            "-p", str(port)]
    process = sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.DEVNULL)
    url = "http://{host}:{port}".format(host=host, port=port)

    for _ in range(30):
        if process.poll() is not None:
            break

        try:
            # we need to bypass the proxies due to monkeypatches
            requests.get(url, timeout=0.1, proxies=_proxy_bypass)
            break
        except requests.exceptions.RequestException:
            time.sleep(0.1)
    else:
        stop_process(process)
        raise AssertionError("Can not start service: {}".format(service_name))

    return process


def stop_process(process, timeout=20):
    try:
        process.send_signal(signal.SIGTERM)
        process.communicate(timeout=timeout / 2)
    except sp.TimeoutExpired:
        process.kill()
        outs, errors = process.communicate(timeout=timeout / 2)
        exit_code = process.returncode
        msg = "Child process finished {} not in clean way: {} {}" \
            .format(exit_code, outs, errors)
        raise RuntimeError(msg)
Javier
  • 2,752
  • 15
  • 30
2

We can create S3 server using moto[server] and then create a pytest fixture out of it similar to aioboto3

@pytest.yield_fixture(scope='session')
def s3_server():
    host = 'localhost'
    port = 5002
    url = 'http://{host}:{port}'.format(host=host, port=port)
    process = start_service('s3', host, port)
    yield url
    stop_process(process)

and then patch('aiobotocore.AioSession.create_client') return_value with aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server) like follows

async with aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server) as client:
    with patch('aiobotocore.AioSession.create_client') as mock:
        mock.return_value = client
        # Test your code
reddy nishanth
  • 396
  • 6
  • 11
  • Good idea but this code needs to be cleaned up. It doesn't work. For example it's aiobotocore.session.AioSession() not aiobotocore.get_session() And endpoint_url not end_point_url. Have to write it here because I can't suggest an edit – Chuque Mar 13 '23 at 17:38
2

Unfortunately this is not a complete answer, but there is a pull request that adds this functionality that has been open for 6 months: https://github.com/aio-libs/aiobotocore/pull/766

When I've dealt with similar problems for asyncio, I have hand-coded "async" wrappers for the sync object.

Att Righ
  • 1,439
  • 1
  • 16
  • 29