0

I have list of csv files which are 200+mbs right now but in future it will grow and when I read them using csv module my system gets hanged so I am looking for optimizing the approach I have right now in order to read the csv files in chunks so it won't load all in the memory at once.

Here is the code:

import asyncio
import aiohttp
import csv
import ast
import logging
from glob import glob

logging.basicConfig(
    filename="post_script.log", format="%(asctime)s %(message)s", filemode="w"
)

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

csv.field_size_limit(100000000)

headers = {"Accept": "*/*", "Content-Type": "application/json"}


async def post_func(session, file, store_id):
    data = {}
    with open(file, "r") as f_in:
        reader = csv.DictReader(f_in)
        for row in reader:
            data.setdefault((row["LabebStoreId"], row["catalog_uuid"]), []).append(row)
    for payload_no, v in enumerate(data.values(), 1):
        if len(v) == 1:
            payload = {
                "row": {
                    "LabebStoreId": v[0]["LabebStoreId"],
                    "catalog_uuid": v[0]["catalog_uuid"],
                    "lang": v[0]["lang"],
                    "cat_0_name": v[0]["cat_0_name"],
                    "cat_1_name": v[0]["cat_1_name"],
                    "cat_2_name": v[0]["cat_2_name"],
                    "cat_3_name": v[0]["cat_3_name"],
                    "catalogname": v[0]["catalogname"],
                    "description": v[0]["description"],
                    "properties": v[0]["properties"],
                    "price": v[0]["price"],
                    "price_before_discount": v[0]["price_before_discount"],
                    "externallink": v[0]["externallink"],
                    "Rating": v[0]["Rating"],
                    "delivery": v[0]["delivery"],
                    "discount": v[0]["discount"],
                    "instock": v[0]["instock"],
                    "images": ast.literal_eval(v[0]["encoded_images"]),
                }
            }

            response = await session.post(
                f"http://crawlerapi.labeb.com/api/PCCrawler/Crawl?StoreId={store_id}",
                json=payload,
            )
            logger.debug(f"Posting to {response.url}")
            logger.debug(f"""row: {payload["row"]["externallink"]}""")
            logger.info(await response.text())

        else:
            payload = {
                "row": {
                    "LabebStoreId": v[0]["LabebStoreId"],
                    "catalog_uuid": v[0]["catalog_uuid"],
                    "lang": v[0]["lang"],
                    "cat_0_name": v[0]["cat_0_name"],
                    "cat_1_name": v[0]["cat_1_name"],
                    "cat_2_name": v[0]["cat_2_name"],
                    "cat_3_name": v[0]["cat_3_name"],
                    "catalogname": v[0]["catalogname"],
                    "description": v[0]["description"],
                    "properties": v[0]["properties"],
                    "price": v[0]["price"],
                    "price_before_discount": v[0]["price_before_discount"],
                    "externallink": v[0]["externallink"],
                    "Rating": v[0]["Rating"],
                    "delivery": v[0]["delivery"],
                    "discount": v[0]["discount"],
                    "instock": v[0]["instock"],
                    "images": ast.literal_eval(v[0]["encoded_images"]),
                },
                "nextRow": {
                    "LabebStoreId": v[1]["LabebStoreId"],
                    "catalog_uuid": v[1]["catalog_uuid"],
                    "lang": v[1]["lang"],
                    "cat_0_name": v[1]["cat_0_name"],
                    "cat_1_name": v[1]["cat_1_name"],
                    "cat_2_name": v[1]["cat_2_name"],
                    "cat_3_name": v[1]["cat_3_name"],
                    "catalogname": v[1]["catalogname"],
                    "description": v[1]["description"],
                    "properties": v[1]["properties"],
                    "price": v[1]["price"],
                    "price_before_discount": v[1]["price_before_discount"],
                    "externallink": v[1]["externallink"],
                    "Rating": v[1]["Rating"],
                    "delivery": v[1]["delivery"],
                    "discount": v[1]["discount"],
                    "instock": v[1]["instock"],
                    "images": ast.literal_eval(v[1]["encoded_images"]),
                },
            }

            response = await session.post(
                f"http://crawlerapi.labeb.com/api/PCCrawler/Crawl?StoreId={store_id}",
                json=payload,
            )
            logger.debug(f"Posting to {response.url}")
            logger.debug(f"""row: {payload["row"]["externallink"]}""")
            logger.debug(f"""nextRow: {payload["row"]["externallink"]}""")
            logger.info(await response.text())

        logger.info("-" * 80)


async def main():
    try:
        async with aiohttp.ClientSession(headers=headers) as session:
            logging.info(f"---starting new---")
            post_tasks = []
            files = glob("*.csv")
            for file in files:
                store_id = file.split("_")[0]
                post_tasks.append(
                    asyncio.create_task(post_func(session, file, store_id))
                )
            await asyncio.gather(*post_tasks)
            logging.info(f"---finished---")
    except Exception as e:
        logger.error(e)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Let me explain what the code is about and what it is doing. Basically it is reading the csv file getting all the rows and posting it to the api through HTTP POST request. Can anyone please guide me on how to read all the csv files efficiently? Thank you!

X-somtheing
  • 219
  • 2
  • 10
  • have you tried using apache arrow's csv reader? – Anirudh Dec 11 '22 at 13:56
  • No. Does apache arrow's csv reader supports DictReader ability? – X-somtheing Dec 11 '22 at 14:04
  • Try this - https://stackoverflow.com/questions/68555085/how-can-i-chunk-through-a-csv-using-arrow/68563617#68563617. – Anirudh Dec 11 '22 at 14:06
  • I am trying `pyarrow` but it is giving me this error `pyarrow.lib.ArrowInvalid: In CSV column #11: CSV conversion error to null: invalid value '4.18'`. I am using pyarrow first time so no idea what does that mean may be it is not accepting the float values or something else. – X-somtheing Dec 11 '22 at 14:22
  • With `open_csv()`, the data types of the columns should be consistent. `open_csv()` infers the data types on the first chunk of data being read, and if the type changes after that in one of your columns, you will run into errors. – Anirudh Dec 11 '22 at 14:43
  • yeah I have define the data types of the columns like this `convert_options.column_types = {'price_before_discount': pa.string(), 'discount': pa.string(), 'price': pa.string()}` and that error was gone – X-somtheing Dec 11 '22 at 14:49
  • Now I am having this error `TypeError: unhashable type: 'pyarrow.lib.Int64Array'` when I try to group the columns like this `data.setdefault((nex_chunk[0], nex_chunk[1]), []).append(nex_chunk[idx])`. Can you help me out here? – X-somtheing Dec 11 '22 at 15:14
  • Have you considered using multithreading with a limited number of concurrent threads in order to better manage your memory availability? – DarkKnight Dec 11 '22 at 15:24
  • No not yet. Do you have any working example using mutlithreading? – X-somtheing Dec 11 '22 at 15:29
  • @X-somtheing I have many examples and there are a multitude of other examples out there on the internet. Your challenge is to find something that can be made relevant to your use-case – DarkKnight Dec 11 '22 at 15:52
  • Can you please share some of the examples so I can take a look and get some ideas to implement that fits my use-case? – X-somtheing Dec 11 '22 at 15:55
  • 1) I would start this without `async` to make sure that is not the issue. 2) Does the API support bulk loading of data, instead of doing one row at a time? 3) I would split out the the CSV reading from `post_func()` 4) What is the purpose of `for payload_no, v in enumerate(data.values(), 1)` as `payload_no` is never used? – Adrian Klaver Dec 11 '22 at 17:55

0 Answers0