Im using Scrapy (aka Twisted) and also Postgres as a database.
After I while my connections seem to fill up and then my script is been stuck. I checked this with this query SELECT * FROM pg_stat_activity;
and read that its caused because Postgres has no connection pool.
I read about txpostgres and PGBouncer, Bouncer regrettably isn't an option, what else can I do to avoid this problem?
So far I use the following pipeline:
import psycopg2
from twisted.enterprise import adbapi
import logging
from datetime import datetime
import scrapy
from scrapy.exceptions import DropItem
class PostgreSQLPipeline(object):
""" PostgreSQL pipeline class """
def __init__(self, dbpool):
self.logger = logging.getLogger(__name__)
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbargs = dict(
host=settings['POSTGRESQL_HOST'],
database=settings['POSTGRESQL_DATABASE'],
user=settings['POSTGRESQL_USER'],
password=settings['POSTGRESQL_PASSWORD'],
)
dbpool = adbapi.ConnectionPool('psycopg2', **dbargs)
return cls(dbpool)
def process_item(self, item, spider):
d = self.dbpool.runInteraction(self._insert_item, item, spider)
d.addErrback(self._handle_error, item, spider)
d.addBoth(lambda _: item)
return d
def _insert_item(self, txn, item, spider):
"""Perform an insert or update."""
now = datetime.utcnow().replace(microsecond=0).isoformat(' ')
txn.execute(
"""
SELECT EXISTS(
SELECT 1
FROM expose
WHERE expose_id = %s
)
""", (
item['expose_id'],
)
)
ret = txn.fetchone()[0]
if ret:
self.logger.info("Item already in db: %r" % (item))
txn.execute(
"""
UPDATE expose
SET last_seen=%s, offline=0
WHERE expose_id=%s
""", (
now,
item['expose_id']
)
)
else:
self.logger.info("Item stored in db: %r" % (item))
txn.execute("""
INSERT INTO expose (
expose_id,
title
) VALUES (%s, %s)
""", (
item['expose_id'],
item['title']
)
)
# Write image info (path, original url, ...) to db, CONSTRAIN to expose.expose_id
for image in item['images']:
txn.execute(
"""
INSERT INTO image (
expose_id,
name
) VALUES (%s, %s)
""", (
item['expose_id'],
image['path'].replace('full/', '')
)
)
def _handle_error(self, failure, item, spider):
"""Handle occurred on db interaction."""
# do nothing, just log
self.logger.error(failure, failure.printTraceback())