2

I'm scraping data using asyncio and storing the data in a Redis database. My scrap is running fine, but memory utilization on the linux server keeps on increasing till it reaches 100% and then it freezes the server. I have to manually reboot the server and restart the script. I'm using 2 credentials to hit an api endpoint to get data fast as possible.

Here is the sample code:

from asyncio import tasks
from datetime import datetime, timedelta
from multiprocessing import Semaphore
from socket import timeout
import time
import asyncio
from aiohttp import ClientSession
from requests.exceptions import HTTPError
import config
import json
import pandas as pd
from loguru import logger
import pytz
import aioredis
from redis import Redis

RESULTS = []
result_dict = {}


redis = Redis(
    host="host",
    port=6379,
    decode_responses=True,
    # ssl=True,
    username="default",
    password="password",
)


async def get(url, session):
    try:
        response = await session.request(method="GET", url=url, timeout=1)
    except Exception as err:
        response = await session.request(method="GET", url=url, timeout=3)
    pokemon = await response.json()
    return pokemon["name"]


async def run_program(url, session, semaphore):
    async with semaphore:
        try:
            pokemon_name = await get(url, session)
            await publish(pokemon_name)
        except:
            pass


async def main():
    header_dict = {
        "header1": {
            # Request headers
            # "API-Key-1": config.PRIMARY_API_KEY,
            "Cache-Control": "no-cache",
        },
        "header2": {
            # "API-Key-2": config.SECONDARY_API_KEY,
            "Cache-Control": "no-cache",
        },
    }
    semaphore = asyncio.BoundedSemaphore(20)
    tasks = []
    for key, value in header_dict.items():
        # logger.info(value)
        async with ClientSession(headers=value) as session:
            for i in range(0, 5):
                URLS = f"https://pokeapi.co/api/v2/pokemon/{i}"
                tasks.append(
                    asyncio.ensure_future(run_program(URLS, session, semaphore))
                )
            await asyncio.gather(*tasks)


async def publish(data):
    if not data.empty:
        try:
            keyName = "channelName"
            value = data
            redis.set(keyName, value)
            print("inserting")
        except:
            pass
    else:
        pass


while True:
    try:
        asyncio.run(main(), debug=True)
    except Exception as e:
        time.sleep(1)
        asyncio.run(main(), debug=True)

I want to know why memory consumption is increasing and how to stop it.

Here is the image of memory utilization in percent over time. There is no other script running on the same Linux server except this one.

Here is the image of memory utilization in percent over time

1 Answers1

0

There are many causes of memory licking.

  • You're connecting to Redis and never close the connection.
  • When you are setting timeout=1 it much possibly will raise exceptions which can be the main cause of memory licking (see: Python not catching MemoryError)
  • The session is created on every iteration over headers. In the example, those are two but not sure about the real headers list size.
  • tasks are not getting empty after gather is called.

I tried to optimize the code and here is what I got.

import asyncio
import time

from aiohttp import ClientSession
from redis import DataError
from redis import Redis


async def publish(data, redis):
    if not data.empty:
        try:
            redis.set("channelName", data)
        except (DataError, Exception):
            pass


async def run_program(url, session, headers, semaphore, redis):
    async with semaphore:
        try:
            response = await session.request(method="GET", url=url, headers=headers)
            pokemon = await response.json()
            pokemon_name = pokemon.get("name")
            await publish(pokemon_name, redis)
        except:
            pass


async def main():
    header_dict = {
        "header1": {
            # Request headers
            "Cache-Control": "no-cache",
        },
        "header2": {
            "Cache-Control": "no-cache",
        },
    }
    semaphore = asyncio.BoundedSemaphore(20)
    async with ClientSession() as session:
        for headers in header_dict.values():
            with Redis(host="host", port=6379, decode_responses=True, username="default", password="password") as redis:
                await asyncio.gather(*[
                    asyncio.ensure_future(
                        run_program(f"https://pokeapi.co/api/v2/pokemon/{i}", session, headers, semaphore, redis)
                    ) for i in range(5)
                ])


while True:
    try:
        asyncio.run(main(), debug=True)
    except Exception as e:
        time.sleep(1)
        asyncio.run(main(), debug=True)

All these changes should optimize memory usage.

Artyom Vancyan
  • 5,029
  • 3
  • 12
  • 34
  • 1
    Thanks for the answer, optimizing the code slowed down the memory utilization, but it still slowly rises and eats up the memory. I'm testing a couple of other solutions as well, I'll post if something worked out @Artyom Vancyan – Yadvendar Singh Jun 06 '22 at 14:34
  • Try also using DevOps solutions for infinite running instead of `while True`. Always welcome :) – Artyom Vancyan Jun 06 '22 at 14:43