0

I am scraping a large amount of data from a website and the problem is it is taking too much time by inserting one by one into the database I am looking for a smart way to bulk insert or make a batch insert to the database so it won't take like forever to push it to the database. I am using sqlalchemy1.4 orm and scrapy framework.

models:

from sqlalchemy import Column, Date, String, Integer, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from . import settings

engine = create_engine(settings.DATABSE_URL)
Session = sessionmaker(bind=engine)
session = Session()
DeclarativeBase = declarative_base()


class Olx_Eg(DeclarativeBase):
    """
    Defines the property listing model
    """

    __tablename__ = "olx_egypt"
    _id = Column(Integer, primary_key=True)
    URL = Column("URL", String)
    Breadcrumb = Column("Breadcrumb", String)
    Price = Column("Price", String)
    Title = Column("Title", String)
    Type = Column("Type", String)
    Bedrooms = Column("Bedrooms", String)
    Bathrooms = Column("Bathrooms", String)
    Area = Column("Area", String)
    Location = Column("Location", String)
    Compound = Column("Compound", String)
    seller = Column("seller", String)
    Seller_member_since = Column("Seller_member_since", String)
    Seller_phone_number = Column("Seller_phone_number", String)
    Description = Column("Description", String)
    Amenities = Column("Amenities", String)
    Reference = Column("Reference", String)
    Listed_date = Column("Listed_date", String)
    Level = Column("Level", String)
    Payment_option = Column("Payment_option", String)
    Delivery_term = Column("Delivery_term", String)
    Furnished = Column("Furnished", String)
    Delivery_date = Column("Delivery_date", String)
    Down_payment = Column("Down_payment", String)
    Image_url = Column("Image_url", String)

Here is my scrapy pipeline right now:

from olx_egypt.models import Olx_Eg, session


class OlxEgPipeline:
    def __init__(self):
        """
        Initializes database connection and sessionmaker.
        Creates items table.
        """

    def process_item(self, item, spider):
        """
        Process the item and store to database.
        """
        # session = self.Session()
        instance = session.query(Olx_Eg).filter_by(Reference=item["Reference"]).first()
        if instance:
            return instance
        else:
            olx_item = Olx_Eg(**item)
            session.add(olx_item)

        try:
            session.commit()
        except:
            session.rollback()
            raise
        finally:
            session.close()

        return item

I tried creating a list and appending the items to it and then on closing the spider push it to db:

from olx_egypt.models import Olx_Eg, session

class ExampleScrapyPipeline:

    def __init__(self):

        self.items = []

    def process_item(self, item, spider):
        
        self.items.append(item)

        return item

    def close_spider(self, spider):
       

        try:
            session.bulk_insert_mappings(Olx_Eg, self.items)
            session.commit()

        except Exception as error:
            session.rollback()
            raise

        finally:
            session.close()

but it failed on session.bulk_insert_mappings(Olx_Eg, self.items) this line. Can anyone tell me how can I make scrapy pipeline bulk or batch insert?

dougj
  • 135
  • 3
  • 15

1 Answers1

2

I was actually working on something very similar and have built a pipeline to inject the data with using pandas.to_sql, there are less lines of code required and its pretty fast as I have activated method='multi', if you're uploading to mssql then you can take advantage of fast_executemany=True, as provided in this post: Speeding up pandas.DataFrame.to_sql with fast_executemany of pyODBC.

I have tried to make it as general as possible for access to different drivernames.

Here's with an example:

scraper.py

import scrapy
from scrapy_exercises.items import ScrapyExercisesItem
from scrapy.crawler import CrawlerProcess

class SQLTest(scrapy.Spider):
    name = 'SQL'
    start_urls = [f'https://quotes.toscrape.com/page/{i}/' for i in range(1, 11)]

    custom_settings = {
        "FEED": {"test" : {"format": "csv"}}
    }

    def start_requests(self):
        for url in self.start_urls:
            yield scrapy.Request(
                url=url,
                callback = self.parse
            )

    def parse(self, response):
        content = response.xpath("//div[@class='col-md-8']//div")
        for items in content:
            table = ScrapyExercisesItem()
            #table._name= items.xpath(".//span//@href").get()
            #table._keyword= items.xpath(".//div[@class = 'tags']//a[1]//text()").get()
            #yield table.returnTable()
            table['name'] = items.xpath(".//span//@href").get()
            table['keyword'] = items.xpath(".//div[@class = 'tags']//a[1]//text()").get()
            return table

items.py

import scrapy

class ScrapyExercisesItem(scrapy.Item):
    name = scrapy.Field()
    keyword = scrapy.Field()

pipelines.py

from sqlalchemy import create_engine, String
import pandas as pd
import pyodbc
import logging
from itemadapter import is_item
from itemadapter import ItemAdapter

logger = logging.getLogger(__name__)

class DataframeSQLPipelineInject:

    def __init__(self, user, passw, host, port, database, table, if_exists, drivername):
        self._user = user
        self._passw = passw
        self._host = host
        self._port = port
        self._database = database
        self.table = table
        self.if_exists = if_exists
        self.drivername = drivername

    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            user = crawler.settings.get('DATABASE')['user'],
            passw = crawler.settings.get('DATABASE')['passw'],
            host = crawler.settings.get('DATABASE')['host'],
            port = crawler.settings.get('DATABASE')['port'],
            database = crawler.settings.get('DATABASE')['database'],
            table = crawler.settings.get('DATABASE')['table'],
            if_exists = crawler.settings.get('DATABASE')['if_exists'],
            drivername = crawler.settings.get('DATABASE')['drivername']
        )

    def open_spider(self, spider): 
        self.engine = create_engine(
            f'{self.drivername}://' + #change this to your required server
            self._user + ':' + 
            self._passw + '@' + 
            self._host + ':' + 
            str(self._port) + '/' + 
            self._database  ,#+f'?driver=ODBC+Driver+18+for+SQL+Server' , #change this to your required driver
            echo=False,
            #connect_args={"timeout":30},
                            pool_pre_ping=True
#fast_executemany=True 
#--- Add if using drivername mssql+pyodbc, 
#then remove if_exists = self.if_exists from table_df
                                                )

        self.conn = self.engine.connect()

    def close_spider(self, spider):
        self.conn.close()

    def process_item(self,item, spider):
        if is_item(item):
            table_df = pd.DataFrame([ItemAdapter(item).asdict()])
            print(table_df.dtypes)
            table_df.to_sql(self.table, con=self.engine,method='multi',dtype={'name':String(), 'keyword':String()}, chunksize=2000, index=False, if_exists = self.if_exists)
        else:
            logger.error(f'You need a dict for item, you have type: {type(item)}')

settings.py:

DATABASE = {
    "user": "usr",
    "passw": "",
    "host": "localhost",
    "port": '5432',
    "database": "scraper",
    'table':'some_table',
    'if_exists':'append',
    'drivername':'postgresql'
}

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

ITEM_PIPELINES = {
    'scrapy_exercises.pipelines.sql_import.DataframeSQLPipelineInject':50
    }

You'll need to use if_exists and add append even if you want to create a table. Because scrapy is single threaded it will create then append the values on after each reactor loop.

I hope this helps with your speed problem as I have not tested with large amounts of data.

It works on my end, check the image:

enter image description here

Update your items.py with this:

class ScrapyExercisesItem(scrapy.Item):
    URL = scrapy.Field()
    Breadcrumb = scrapy.Field()
    Price = scrapy.Field()
    Title = scrapy.Field()
    Type = scrapy.Field()
    Bedrooms = scrapy.Field()
    Bathrooms = scrapy.Field()
    Area = scrapy.Field()
    Location = scrapy.Field()
    keyword = scrapy.Field()
    Compound = scrapy.Field()
    seller = scrapy.Field()
    Seller_member_since = scrapy.Field()
    Seller_phone_number = scrapy.Field()
    Description = scrapy.Field()
    Amenities = scrapy.Field()
    Reference = scrapy.Field()
    Listed_date = scrapy.Field()
    Level = scrapy.Field()
    Payment_option = scrapy.Field()
    Delivery_term = scrapy.Field()
    Furnished = scrapy.Field()
    Delivery_date = scrapy.Field()
    Down_payment = scrapy.Field()
    Image_url = scrapy.Field()

And remove the following in your scraper:

item = {}

replace it with:

from your_path.items import ScrapyExercisesItem
item = ScrapyExercisesItem()

Then do not yield but return instead. It is working for me so it should work for you.

joe_bill.dollar
  • 374
  • 1
  • 9
  • I tried your code but it failed on `table_df.to_sql(name='olx_egypt', con=engine,method='multi', chunksize=100, index=False, if_exists = 'append')`. Here is the traceback https://pastebin.com/StzGFwRj – dougj Jul 08 '22 at 23:25
  • You need to amend it to your needs for example, you're probably not on the same port or drivername. The error just means that you're not listening on port 5432. Alternatively, drop me a link of your scraper and I can check it on my side perhaps its aws blocking it – joe_bill.dollar Jul 09 '22 at 00:07
  • Here is the scraper repo: https://github.com/0xlearner/Olx_Scraper – dougj Jul 09 '22 at 01:02
  • @dougj It seems to work for me, however because you're scraping loads of items the scraper is actually fairly slow at getting items which is probably the issue you were actually experienced. Although, the `pipeline` I have built seems to upload at the speed it grabs them. Did you update your `items.py` ? – joe_bill.dollar Jul 09 '22 at 01:19
  • Yeah it is working now. Thanks alot man. I just want to clear one more thiing chunk_size is like len of items right? LIke when items reached to the chunk_size it will push it to the db, am I understanding it correctly? – dougj Jul 09 '22 at 01:43
  • If you had a dataframe with 1m rows chunk_size just uploads 2000 rows at a time rather than all at once. You could remove it as you aren't scraping that much, it's just that I have had issues in the past not including it when uploading to db. You can give it a try, although I tested the amount uploaded to db compared to a stored output, and they're pretty much at the same rate. Check the docs on [pandas.to_sql](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html) for more info. – joe_bill.dollar Jul 09 '22 at 01:46
  • Why this same logic not working when I try to run on digitalocean vps? it gives the previous error I shared? – dougj Jul 09 '22 at 02:10