37

This is not working anymore, scrapy's API has changed.

Now the documentation feature a way to "Run Scrapy from a script" but I get the ReactorNotRestartable error.

My task:

from celery import Task

from twisted.internet import reactor

from scrapy.crawler import Crawler
from scrapy import log, signals
from scrapy.utils.project import get_project_settings

from .spiders import MySpider



class MyTask(Task):
    def run(self, *args, **kwargs):
        spider = MySpider
        settings = get_project_settings()
        crawler = Crawler(settings)
        crawler.signals.connect(reactor.stop, signal=signals.spider_closed)
        crawler.configure()
        crawler.crawl(spider)
        crawler.start()

        log.start()
        reactor.run()
Community
  • 1
  • 1
Juan Riaza
  • 1,618
  • 2
  • 16
  • 35

5 Answers5

42

The twisted reactor cannot be restarted. A work around for this is to let the celery task fork a new child process for each crawl you want to execute as proposed in the following post:

This gets around the "reactor cannot be restart-able" issue by utilizing the multiprocessing package. But the problem with this is that the workaround is now obsolete with the latest celery version due to the fact that you will instead run into another issue where a daemon process can't spawn sub processes. So in order for the workaround to work you need to go down in celery version.

Yes, and the scrapy API has changed. But with minor modifications (import Crawler instead of CrawlerProcess). You can get the workaround to work by going down in celery version.

The Celery issue can be found here: Celery Issue #1709

Here is my updated crawl-script that works with newer celery versions by utilizing billiard instead of multiprocessing:

from scrapy.crawler import Crawler
from scrapy.conf import settings
from myspider import MySpider
from scrapy import log, project
from twisted.internet import reactor
from billiard import Process
from scrapy.utils.project import get_project_settings
from scrapy import signals


class UrlCrawlerScript(Process):
    def __init__(self, spider):
        Process.__init__(self)
        settings = get_project_settings()
        self.crawler = Crawler(settings)
        self.crawler.configure()
        self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed)
        self.spider = spider

    def run(self):
        self.crawler.crawl(self.spider)
        self.crawler.start()
        reactor.run()

def run_spider(url):
    spider = MySpider(url)
    crawler = UrlCrawlerScript(spider)
    crawler.start()
    crawler.join()

Edit: By reading the celery issue #1709 they suggest to use billiard instead of multiprocessing in order for the subprocess limitation to be lifted. In other words we should try billiard and see if it works!

Edit 2: Yes, by using billiard, my script works with the latest celery build! See my updated script.

Lord Elrond
  • 13,430
  • 7
  • 40
  • 80
Bj Blazkowicz
  • 1,157
  • 9
  • 20
  • 2
    Note - I had to move the `self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed)` line outside of the initialization check or the second run through would hang. Moving it makes it work fine in my project. Also, as `scrapy.project` is depreciated, used billiard's `current_thread` to set an initialization flag on a per thread basis. That worked great too. – jlovison May 22 '14 at 07:35
  • 1
    jlovison, can you please share the changes you made on current_thread? and where did you place signals.spider_closed? thanks in advance – Mo J. Mughrabi Dec 13 '14 at 13:19
  • @BjBlazkowicz since Process is the base class here and there is a call to Process.__init__(self) , isn't it [also necessary for `__del__` to be called](https://docs.python.org/2/reference/datamodel.html#object.__del__) for the derived class UrlCrawlerScript or will it be called automatically ? – wsdookadr Apr 22 '15 at 06:17
  • @average It should not be necessary – Bj Blazkowicz Apr 22 '15 at 08:37
  • @jlovison Thanks for pointing it out. I've removed the init-check since the install() call is not necessary – Bj Blazkowicz Apr 22 '15 at 08:38
  • I had this same issue, but with AWS SQS queueing. This code doesn't quite seem to be working... I pass in my url and run_spider, but is not actually running the spider. Any idea why? – ccdpowell Jun 01 '15 at 04:15
  • Hmm - i'm stuck on this problem. My spiders are always "multiplying":. Can you pls help me out:) :http://stackoverflow.com/questions/33737153/scrapy-celery-and-multiple-spiders – Fabian Lurz Nov 16 '15 at 18:18
  • This is the only solution that worked for me. But I had to tweak it a little bit in order to make it work with `scrapy 1.1rc1` http://stackoverflow.com/a/35720294/190148 – netimen Mar 01 '16 at 10:22
  • @BjBlazkowicz Will this help in anyway to use the multiple processes of celery to process the queue? – lennard May 02 '16 at 15:15
  • 1
    @lennard Without forking a new process it will not work. It will work for x times the concurrency you have setup, but only once for each celery worker. But if you can join the celery processes after each task it will work. For this you can make use of the setting: CELERYD_MAX_TASKS_PER_CHILD = 1 – Bj Blazkowicz May 03 '16 at 07:09
  • 1
    For `celery==4.1.0 Scrapy==1.5.0 billiard==3.5.0.3`, I tried making modifications to this but failed. I was using this in django. Then I tried [CrawlerRunner](https://doc.scrapy.org/en/latest/topics/practices.html) and failed too. Eventually I just gave up and fell back to using `CELERY_WORKER_MAX_TASKS_PER_CHILD = 1`. Published code in [this gist](https://gist.github.com/shadiakiki1986/0b2e25cf2e458ccea2b158359b2834a1) – Shadi May 11 '18 at 17:00
  • @BjBlazkowicz how to send argument using this code ?? – Ketan Modi Oct 15 '18 at 13:16
  • I think this approach blocks the task until the crawling is done. Is there a way to run this spider on a detached mode or daemon, maybe defining a when_finished callback? That way the Celery worker would be able to keep running tasks while the spider is running. – Ander Feb 02 '22 at 08:30
  • This does not seem to work with the latest Celery and Scrapy. I get `AttributeError: 'Settings' object has no attribute 'update_settings'`. If you search for this error, people say to use `CrawlerProcess` instead of `Crawler`, but then you cannot use `signals`... – Patrick Yan Dec 16 '22 at 04:53
15

The Twisted reactor cannot be restarted, so once one spider finishes running and crawler stops the reactor implicitly, that worker is useless.

As posted in the answers to that other question, all you need to do is kill the worker which ran your spider and replace it with a fresh one, which prevents the reactor from being started and stopped more than once. To do this, just set:

CELERYD_MAX_TASKS_PER_CHILD = 1

The downside is that you're not really using the Twisted reactor to its full potential and wasting resources running multiple reactors, as one reactor can run multiple spiders at once in a single process. A better approach is to run one reactor per worker (or even one reactor globally) and don't let crawler touch it.

I'm working on this for a very similar project, so I'll update this post if I make any progress.

Blender
  • 289,723
  • 53
  • 439
  • 496
2

To avoid ReactorNotRestartable error when running Scrapy in Celery Tasks Queue I've used threads. The same approach used to run Twisted reactor several times in one app. Scrapy also used Twisted, so we can do the same way.

Here is the code:

from threading import Thread
from scrapy.crawler import CrawlerProcess
import scrapy

class MySpider(scrapy.Spider):
    name = 'my_spider'


class MyCrawler:

    spider_settings = {}

    def run_crawler(self):

        process = CrawlerProcess(self.spider_settings)
        process.crawl(MySpider)
        Thread(target=process.start).start()

Don't forget to increase CELERYD_CONCURRENCY for celery.

CELERYD_CONCURRENCY = 10

works fine for me.

This is not blocking process running, but anyway scrapy best practice is to process data in callbacks. Just do this way:

for crawler in process.crawlers:
    crawler.spider.save_result_callback = some_callback
    crawler.spider.save_result_callback_params = some_callback_params

Thread(target=process.start).start()
0

Here's what worked for me, inspired by this answer:

from scrapy.settings import Settings
from scraper.scraper import settings as scraper_settings
from celery import signals, Celery
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")

app = Celery("enrichment")

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    from twisted.internet import asyncioreactor
    asyncioreactor.install() # TWISTED_REACTOR setting in scraper/scraper/settings.py
    from crochet import setup
    setup()


@app.task()
def do_scraping():
    crawler_settings = Settings()
    crawler_settings.setmodule(scraper_settings)

    runner = CrawlerRunner(settings=crawler_settings)
    runner.crawl("spider_name", url="some_url")
-2

I would say this approach is very inefficient if you have a lot of tasks to process. Because Celery is threaded - runs every task within its own thread. Let's say with RabbitMQ as a broker you can pass >10K q/s. With Celery this would potentially cause to 10K threads overhead! I would advise not to use celery here. Instead access the broker directly!