2

So, I made this class so that I can crawl on-demand using Scrapy:

from scrapy import signals
from scrapy.crawler import CrawlerProcess, Crawler
from scrapy.settings import Settings


class NewsCrawler(object):

    def __init__(self, spiders=[]):
        self.spiders = spiders
        self.settings = Settings()

    def crawl(self, start_date, end_date):
        crawled_items = []

        def add_item(item):
            crawled_items.append(item)

        process = CrawlerProcess(self.settings)

        for spider in self.spiders:
            crawler = Crawler(spider, self.settings)
            crawler.signals.connect(add_item, signals.item_scraped)
            process.crawl(crawler, start_date=start_date, end_date=end_date)

        process.start()

        return crawled_items

Basically, I have a long running process and I will call the above class' crawl method multiple times, like this:

import time


crawler = NewsCrawler(spiders=[Spider1, Spider2])

while True:
    items = crawler.crawl(start_date, end_date)
    # do something with crawled items ...
    time.sleep(3600)

The problem is, the second time crawl being called, this error will occurs: twisted.internet.error.ReactorNotRestartable.

From what I gathered, it's because reactor can't be run after it's being stopped. Is there any workaround for that?

Thanks!

wiseodd
  • 163
  • 2
  • 10

2 Answers2

3

This is a limitation of scrapy(twisted) at the moment and makes it hard using scrapy as a lib.

What you can do is fork a new process which runs the crawler and stops the reactor when the crawl is finished. You can then wait for join and spawn a new process after the crawl has finished. If you want to handle the items in your main thread you can post the results to a Queue. I would recommend using a customized pipelines for your items though.

Have a look at the following answer by me: https://stackoverflow.com/a/22202877/2208253

You should be able to apply the same principles. But you would rather use multiprocessing instead of billiard.

Community
  • 1
  • 1
Bj Blazkowicz
  • 1,157
  • 9
  • 20
  • Thanks! However, using `Queue` is really slow and I end up to use pickle to communicate the crawling result to the main thread until I find another alternatives. Do you have any suggestion? – wiseodd Feb 12 '16 at 09:47
  • I would create a customized pipeline to handle crawled items. You could then choose a data structure that suits your needs, and later let your post processor handle the items when the crawled is finished. Do you need to wait for the crawl to finish? If not, I would just process the items on the fly in a pipeline. – Bj Blazkowicz Feb 12 '16 at 10:11
  • So the queue was slow because I used it the wrong way. I used it after I joined the process. When I do `queue.get()` **before** joining the process, it works well. Still, I have no idea why do I have to do it before joining the process? – wiseodd Mar 05 '16 at 03:20
0

Based on @bj-blazkowicz's answer above. I found out a solution with CrawlerRunner which is the recommended crawler to use when running multiple spiders as stated in the docs https://docs.scrapy.org/en/latest/topics/practices.html#run-scrapy-from-a-script

There’s another Scrapy utility that provides more control over the crawling process: scrapy.crawler.CrawlerRunner. This class is a thin wrapper that encapsulates some simple helpers to run multiple crawlers, but it won’t start or interfere with existing reactors in any way.

Using this class the reactor should be explicitly run after scheduling your spiders. It’s recommended you use CrawlerRunner instead of CrawlerProcess if your application is already using Twisted and you want to run Scrapy in the same reactor.

Code in your main file:

from multiprocessing import Process, Queue

from scrapy.crawler import CrawlerRunner
from scrapy.utils.project import get_project_settings
from scrapy.utils.log import configure_logging
from twisted.internet import reactor

# Enable logging for CrawlerRunner
configure_logging()

class CrawlerRunnerProcess(Process):
    def __init__(self, spider, q, *args):
        Process.__init__(self)
        self.runner = CrawlerRunner(get_project_settings())
        self.spider = spider
        self.q = q
        self.args = args

    def run(self):
        deferred = self.runner.crawl(self.spider, self.q, self.args)
        deferred.addBoth(lambda _: reactor.stop())
        reactor.run(installSignalHandlers=False)

# The wrapper to make it run multiple spiders, multiple times
def run_spider(spider, *args):  # optional arguments
    q = Queue()  # optional queue to return spider results
    runner = CrawlerRunnerProcess(spider, q, *args)
    runner.start()
    runner.join()
    return q.get()

Code in your spider file:

class MySpider(Spider):
    name = 'my_spider'

    def __init__(self, q, *args, **kwargs):
        super(MySpider, self).__init__(*args, **kwargs)
        self.q = q  # optional queue
        self.args = args  # optional args

    def parse(self, response):
        my_item = MyItem()


        self.q.put(my_item)
        yield my_item