5

I have to write a script that get some URLs in parallel and do some work. In the past I have always used Parallel::ForkManager for such things, but now I wanted to learn something new and try asynchronous programming with AnyEvent (and AnyEvent::HTTP or AnyEvent::Curl::Multi) ... but I'm having problem understanding AnyEvent and writing a script that should:

  • open a file (every line is a seperate URL)
  • (from now in parallel, but with a limit for f.e. 10 concurrent requests)
  • read file line after line (I dont want to load whole file to memory - it might be big)
  • make a HTTP request for that URL
  • read response
  • updates MySQL record accordingly
  • (next file line)

I have read many manuals, tutorials, but its still hard for me to understand differences between blocking and non-blocking code. I have found similar script at http://perlmaven.com/fetching-several-web-pages-in-parallel-using-anyevent, where Mr. Szabo explains the basics, but I still cant understand how to implement something like:

...
open my $fh, "<", $file;
while ( my $line = <$fh> )
{
# http request, read response, update MySQL
}
close $fh
...

... and add a concurrency limit in this case.

I would be very grateful for help ;)

UPDATE

Following Ikegami's advice I gave Net::Curl::Multi a try. I'm very pleased with results. After years of using Parallel::ForkManager just for concurrent grabbing thousands of URLs, Net::Curl::Multi seems to be awesome. Here is my code with while loop on filehandle. It seems to work as it should, but considering it's my first time writing something like this I would like to ask more experienced Perl users to take a look and tell me if there are some potential bugs, something I missed, etc. Also, if I may ask: as I don't fully understand how Net::Curl::Multi's concurrency works, please tell me whether I should expect any problems with putting MySQL UPDATE command (via DBI) inside RESPONSE loop (besides higher server load obviously - I expect final script to run with about 50 concurrent N::C::M workers, maybe more).

#!/usr/bin/perl

use Net::Curl::Easy  qw( :constants );
use Net::Curl::Multi qw( );

sub make_request {
    my ( $url ) = @_;
    my $easy = Net::Curl::Easy->new();
    $easy->{url} = $url;
    $easy->setopt( CURLOPT_URL,        $url );
    $easy->setopt( CURLOPT_HEADERDATA, \$easy->{head} );
    $easy->setopt( CURLOPT_FILE,       \$easy->{body} );
    return $easy;
}

my $maxWorkers = 10;

my $multi = Net::Curl::Multi->new();
my $workers = 0;

my $i = 1;
open my $fh, "<", "urls.txt";
LINE: while ( my $url = <$fh> )
{
    chomp( $url );
    $url .= "?$i";
    print "($i) $url\n";
    my $easy = make_request( $url );
    $multi->add_handle( $easy );
    $workers++;

    my $running = 0;
    do {
        my ($r, $w, $e) = $multi->fdset();
        my $timeout = $multi->timeout();
        select $r, $w, $e, $timeout / 1000
        if $timeout > 0;

        $running = $multi->perform();
        RESPONSE: while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
            $multi->remove_handle( $easy );
            $workers--;
            printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
        }

        # dont max CPU while waiting
        select( undef, undef, undef, 0.01 );
    } while ( $workers == $maxWorkers || ( eof && $running ) );
    $i++;
}
close $fh;
alan
  • 111
  • 6

2 Answers2

5

Net::Curl is a rather good library that's extremely fast. Furthermore, it can handle parallel requests too! I'd recommend using this instead of AnyEvent.

use Net::Curl::Easy  qw( :constants );
use Net::Curl::Multi qw( );

sub make_request {
    my ( $url ) = @_;
    my $easy = Net::Curl::Easy->new();
    $easy->{url} = $url;
    $easy->setopt( CURLOPT_URL,        $url );
    $easy->setopt( CURLOPT_HEADERDATA, \$easy->{head} );
    $easy->setopt( CURLOPT_FILE,       \$easy->{body} );
    return $easy;
}

my $max_running = 10;
my @urls = ( 'http://www.google.com/' );

my $multi = Net::Curl::Multi->new();
my $running = 0;
while (1) {
    while ( @urls && $running < $max_running ) {
       my $easy = make_request( shift( @urls ) );
       $multi->add_handle( $easy );
       ++$running;
    }

    last if !$running;

    my ( $r, $w, $e ) = $multi->fdset();
    my $timeout = $multi->timeout();
    select( $r, $w, $e, $timeout / 1000 )
        if $timeout > 0;

    $running = $multi->perform();
    while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
        $multi->remove_handle( $easy );
        printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
    }
}
ikegami
  • 367,544
  • 15
  • 269
  • 518
  • I'm getting a lot of "callback function is not set". It seems that it shows when there's a domain in URL Host. Im not getting this error if I use a IP. Also, if I put f.e. `print "got it!";` where the `# process $easy` is, the page content is automatically printed. – alan Apr 27 '16 at 20:57
  • Fixed it so the content is stored in $easy rather than printed. I don't get the callback error you get?? [Try it with the change. It might be related] – ikegami Apr 27 '16 at 21:39
  • Thanks for help. Unfortunatelly I still get "callback function is not set". Actually 4 times, then Your `printf`. I dont know where it comes from. – alan Apr 28 '16 at 06:34
  • https://metacpan.org/changes/release/SYP/LWP-Protocol-Net-Curl-0.016 - `minor fix: "callback function is not set" warning when libcurl has the AsynchDNS feature` - same author. – alan Apr 28 '16 at 06:37
  • 1
    Recompiling curl without `--enable-ares` helped and now I dont get ""callback function is not set". – alan Apr 28 '16 at 18:08
  • I updated my "question" post with final script. If I may ask, please take a look ;). – alan Apr 29 '16 at 07:27
2

This does exactly what you want, in an asynchronous fashion, and it does that by wrapping Net::Curl in a safe fashion:

#!/usr/bin/env perl

package MyDownloader;
use strict;
use warnings qw(all);

use Moo;

extends 'YADA::Worker';

has '+use_stats'=> (default => sub { 1 });
has '+retry'    => (default => sub { 10 });

after init => sub {
    my ($self) = @_;

    $self->setopt(
        encoding            => '',
        verbose             => 1,
    );
};

after finish => sub {
    my ($self, $result) = @_;

    if ($self->has_error) {
        print "ERROR: $result\n";
    } else {
        # do the interesting stuff here
        printf "Finished downloading %s: %d bytes\n", $self->final_url, length ${$self->data};
    }
};

around has_error => sub {
    my $orig = shift;
    my $self = shift;

    return 1 if $self->$orig(@_);
    return 1 if $self->getinfo('response_code') =~ m{^5[0-9]{2}$}x;
};

1;

package main;
use strict;
use warnings qw(all);

use Carp;

use YADA;

my $q = YADA->new(
    max     => 8,
    timeout => 30,
);

open(my $fh, '<', 'file_with_urls_per_line.txt')
    or croak "can't open queue: $!";
while (my $url = <$fh>) {
    chomp $url;

    $q->append(sub {
        MyDownloader->new($url)
    });
}
close $fh;
$q->wait;
creaktive
  • 5,193
  • 2
  • 18
  • 32
  • 1
    Although Your idea is great and meet all my requirements, Ikegami's solution is much more understandable and readable for me. Thanks for Your input. Its great to see many ways to achieve same goal. – alan Apr 28 '16 at 18:19