2

I have next model: Command 'collect' (collect_positions.py) -> Celery task (tasks.py) -> ScrappySpider (MySpider) ...

collect_positions.py:

from django.core.management.base import BaseCommand

from tracker.models import Keyword
from tracker.tasks import positions


class Command(BaseCommand):
    help = 'collect_positions'

    def handle(self, *args, **options):

        def chunks(l, n):
            """Yield successive n-sized chunks from l."""
            for i in range(0, len(l), n):
                yield l[i:i + n]

        chunk_size = 1

        keywords = Keyword.objects.filter(product=product).values_list('id', flat=True)

        chunks_list = list(chunks(keywords, chunk_size))
        positions.chunks(chunks_list, 1).apply_async(queue='collect_positions')

        return 0

tasks.py:

from app_name.celery import app
from scrapy.settings import Settings
from scrapy_app import settings as scrapy_settings
from scrapy_app.spiders.my_spider import MySpider
from tracker.models import Keyword
from scrapy.crawler import CrawlerProcess


@app.task
def positions(*args):
    s = Settings()
    s.setmodule(scrapy_settings)

    keywords = Keyword.objects.filter(id__in=list(args))
    process = CrawlerProcess(s)
    process.crawl(MySpider, keywords_chunk=keywords)
    process.start()

    return 1

I run the command through the command line, which creates tasks for parsing. The first queue completes successfully, but other returned an error:

twisted.internet.error.ReactorNotRestartable

Please tell me how can I fix this error? I can provide any data if there is a need...

UPDATE 1

Thanks for the answer, @Chiefir! I managed to run all queues, but only the start_requests() function is started, and parse() does not run.

The main functions of the scrappy spider:

def start_requests(self):
    print('STEP1')

    yield scrapy.Request(
        url='exmaple.com',
        callback=self.parse,
        errback=self.error_callback,
        dont_filter=True
    )

def error_callback(self, failure):
    print(failure)

    # log all errback failures,
    # in case you want to do something special for some errors,
    # you may need the failure's type
    print(repr(failure))

    # if isinstance(failure.value, HttpError):
    if failure.check(HttpError):
        # you can get the response
        response = failure.value.response
        print('HttpError on %s', response.url)

    # elif isinstance(failure.value, DNSLookupError):
    elif failure.check(DNSLookupError):
        # this is the original request
        request = failure.request
        print('DNSLookupError on %s', request.url)

    # elif isinstance(failure.value, TimeoutError):
    elif failure.check(TimeoutError):
        request = failure.request
        print('TimeoutError on %s', request.url)


def parse(self, response):
    print('STEP2', response)

In the console I get:

STEP1

What could be the reason?

2 Answers2

2

This is old question as a world:

This is what helped for me to win the battle against ReactorNotRestartable error: last answer from the author of the question
0) pip install crochet
1) import from crochet import setup
2) setup() - at the top of the file
3) remove 2 lines:
a) d.addBoth(lambda _: reactor.stop())
b) reactor.run()

I had the same problem with this error, and spend 4+ hours to solve this problem, read all questions here about it. Finally found that one - and share it. That is how i solved this. The only meaningful lines from Scrapy docs left are 2 last lines in this my code:

#some more imports
from crochet import setup
setup()

def run_spider(spiderName):
    module_name="first_scrapy.spiders.{}".format(spiderName)
    scrapy_var = import_module(module_name)   #do some dynamic import of selected spider   
    spiderObj=scrapy_var.mySpider()           #get mySpider-object from spider module
    crawler = CrawlerRunner(get_project_settings())   #from Scrapy docs
    crawler.crawl(spiderObj)                          #from Scrapy docs

This code allows me to select what spider to run just with its name passed to run_spider function and after scrapping finishes - select another spider and run it again.

In your case you need in separate file create separate function which runs your spiders and run it from your task. Usually I do in this way :)
P.S. And really there is no way to restart the TwistedReactor.
UPDATE 1
I don't know if you need to call a start_requests() method. For me it usually works just with this code:

class mySpider(scrapy.Spider):
    name = "somname"
    allowed_domains = ["somesite.com"]
    start_urls = ["https://somesite.com"]

    def parse(self, response):
        pass
    def parse_dir_contents(self, response):      #for crawling additional links
        pass
Chiefir
  • 2,561
  • 1
  • 27
  • 46
  • Thanks for the answer! I managed to run all queues, but only the start_requests() function is started, and parse() does not run... – Роман Денисенко May 03 '18 at 18:44
  • Perfectly! I understood what's the matter. Thank you very much for the support! But there is one important point: Can I do so that a next task is not runed until the previous one is completed. At the moment, script create many tasks and after their creation parsing begins. The idea was that until the previous parsing cycle is executed, the new one will not start (to save server resources) – Роман Денисенко May 04 '18 at 13:46
  • I simply measured the runtime of each spider - and run each spider separately on timer with Celery :) Also have a similar problem and solved it in that way. But as I read there is some kind of "task-chaining" there, but i don't know exactly how that can be done. P.S. Если советы помогли - не забудь прокликать стрелку вверх и зеленую галку поставить :D – Chiefir May 04 '18 at 13:52
0

You can fix this by setting the parameter stop_after_crawl to False on the start method of CrawlerProcess:

stop_after_crawl (bool) – stop or not the reactor when all crawlers have finished

@shared_task
def crawl(m_id, *args, **kwargs):
    process = CrawlerProcess(get_project_settings(), install_root_handler=False)
    process.crawl(SpiderClass, m_id=m_id)
    process.start(stop_after_crawl=False)