2

I am using my own pipeline to store the scrapped items into a PostgreSQL Database, I made an expansion a few days ago and I store the data into a 3 Databases now. So, I want to make the pipeline which inserting the data to be called every 100 items or it take the items and insert them 100 by 100.

The reason I want to make it fast and less headache on the DB Servers.

Mahmoud M. Abdel-Fattah
  • 1,479
  • 2
  • 16
  • 34
Morad Edwar
  • 1,030
  • 2
  • 11
  • 27

4 Answers4

7

The solution was not that different from Anandhakumar's answer I created a global list in the settings file with a setter and getter method for it

# This buffer for the bluk insertion
global products_buffer

products_buffer = []

# Append product to the list
def add_to_products_buffer(product):
  global products_buffer
  products_buffer.append(product)

# Get the length of the product
def get_products_buffer_len():
  global products_buffer
  return len(products_buffer)

# Get the products list
def get_products_buffer():
  global products_buffer
  return products_buffer

# Empty the list
def empty_products_buffer():
  global products_buffer
  products_buffer[:] = []

Then I imported it in the pipeline

from project.settings import products_buffer,add_to_products_buffer,get_products_buffer_len,empty_products_buffer,get_products_buffer

and I append the item to the list every time the pipeline is called and i check if the length of the list is 100 I loop on the list to prepare a many inserts quires but the most important magic is to commit them all in one line, Don't commit in the loop or you won't gain anything and it will take long time to insert them all.

def process_item(self, item, spider):  
    # Adding the item to the list
    add_to_products_buffer(item)
    # Check if the length is 100
    if get_products_buffer_len() == 100:
        # Get The list to loop on it
        products_list  = get_products_buffer()
        for item in products_list:
            # The insert query
            self.cursor.execute('insert query')
        try:
            # Commit to DB the insertions quires 
            self.conn.commit()
            # Emty the list
            empty_products_buffer()
        except Exception, e:
            # Except the error

Also you can use executemany if you don't want to loop.

Morad Edwar
  • 1,030
  • 2
  • 11
  • 27
  • I quite like the simplicity of this but how do I process the items that don't reach the 100 limit when the crawler ends? –  Feb 03 '16 at 11:46
  • You have to do it manually by inserting what is left after the spider finishes its job. – Morad Edwar Feb 03 '16 at 12:41
  • I'm using the close_spider method. It seems to be working. Thanks –  Feb 03 '16 at 13:19
  • 1
    do u think this solution could work in scrapy with multiprocessing? It seems that `empty_products_buffer` should be called just after `get_products_buffer` and not after the commit. cause my understanding is that it will delete also the items added while commit was happening and where not taken from get_products_buffer. – Mpizos Dimitris Feb 24 '21 at 19:41
  • @MoradEdwar - Is it thread safe? May not work as expected with multi processing as mentioned by MpizosDimitis – Tejas Patel Sep 01 '21 at 02:34
  • does it need to be global if I just add it in the pipeline file – Jeroen Vermunt Mar 04 '22 at 17:30
1

I don't know scrapy and if it has a any kind of Queue functionality built-in, but maybe you can push your query's onto a standard python Queue from scrapy and then have a consumer that monitors the queue and as soon as there are a 100 items on it, execute them all, which can indeed be done by psycopg2 (see psycopg2: insert multiple rows with one query).

You could do something like

queryQueue = Queue()
def queryConsumer(){
    while True:
    if queryQueue.qsize()==100:
        queries=[queryQueue.get() for i in range(100)]            
        #execute the 100 queries
}
t = Thread(target=queryConsumer)
t.daemon = True
t.start()

From your scrapy method you could call

queryQueue.put(myquery)

to push items onto the queue.

Community
  • 1
  • 1
Dolf Andringa
  • 2,010
  • 1
  • 22
  • 36
0

My suggestion is ,

In your spider itself you can do that no need to write in pipeline at all.

Create a postgres DB connection in spider using psycopg2.

psycopg2.connect(database="testdb", user="postgres", password="cohondob", host="127.0.0.1", port="5432")
connection.cursor()

create a tuple inside the parse function and append the scrapped item to that list.

If it reaches 100 then insert into db then commit that into db.

For example:

            x = ({"name":"q", "lname":"55"},
            {"name":"e", "lname":"hh"},
            {"name":"ee", "lname":"hh"})
cur.executemany("""INSERT INTO bar(name,lname) VALUES (%(name)s, %(lname)s)""", x)
Anandhakumar R
  • 371
  • 1
  • 3
  • 17
0

I just wrote a little scrapy extension to save scraped items to a database. scrapy-sqlitem

It is super easy to use.

pip install scrapy_sqlitem

Define Scrapy Items using SqlAlchemy Tables

from scrapy_sqlitem import SqlItem

class MyItem(SqlItem):
    sqlmodel = Table('mytable', metadata
        Column('id', Integer, primary_key=True),
        Column('name', String, nullable=False))

Write your spider and inherit from SqlSpider

from scrapy_sqlitem import SqlSpider

class MySpider(SqlSpider):
   name = 'myspider'

   start_urls = ('http://dmoz.org',)

   def parse(self, response):
        selector = Selector(response)
        item = MyItem()
        item['name'] = selector.xpath('//title[1]/text()').extract_first()
        yield item

Add DATABASE_URI and chunksize settings to settings.py.

DATABASE_URI = "postgresql:///mydb"

DEFAULT_CHUNKSIZE = 100

CHUNKSIZE_BY_TABLE = {'mytable': 100, 'othertable': 250}

Create the tables and you are done!

http://doc.scrapy.org/en/1.0/topics/item-pipeline.html#activating-an-item-pipeline-component

http://docs.sqlalchemy.org/en/rel_1_1/core/tutorial.html#define-and-create-tables

12Ryan12
  • 304
  • 2
  • 9