1

In Mojolicious full app, I have the following pattern.

  1. Run some legacy requests concurrently (e.g. insert into db1 & db2 at the same time); and
  2. Run some legacy requests sequentially (e.g. for each db, insert table1 first, then table2 afterwards, etc).

Note: for the example below I used httpbin.org Docker image:

docker run --rm --name httpbin -p 8000:80 kennethreitz/httpbin

How can I use Mojo::Promise->map to limit concurrency for this pattern? Can concurrency apply to all my actions in db1 and db2? In my example below is it possible to (say) limit the number of hits to http://localhost:8000 to only 3 at any given moment?

use Mojolicious::Lite -signatures, -async_await;
use Mojo::Util qw(dumper);

helper db1_LocationA_p => async sub ($self, $request)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                my $tx = $self->ua->post('http://localhost:8000/delay/2' => json => $request);
                my $res = $tx->result;
                die $res->message if $res->is_error;
                $res->json;
            },
            sub {
                my ( $subprocess, $err, @res ) = @_;
                $reject->( $err ) if $err;
                $resolve->( @res );
            }
            );
                              });
};

helper db1_LocationB_p => async sub ($self, $request) {
    return $self->db1_LocationA_p("LocationB $request"); # For brevity
};

helper db2_LocationA_p => async sub ($self, $request)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                my $tx = $self->ua->post('http://localhost:8000/delay/5' => json => $request);
                my $res = $tx->result;
                die $res->message if $res->is_error;
                $res->json;
            },
            sub {
                my ( $subprocess, $err, @res ) = @_;
                $reject->( $err ) if $err;
                $resolve->( @res );
            }
            );
                              });
};

helper db2_LocationB_p => async sub ($self, $request) {
    return $self->db2_LocationA_p("LocationB $request"); # For brevity
};

helper add_db1 => async sub($self, $table1, $table2, $table3) {
    # run sequentially. table1 first, then table2, then table3
    my @table1 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table1 );
    my @table2 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table2 );
    my @table3 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_),  $self->db1_LocationB_p($_) } @$table3 );
    return (@table1, @table2, @table3);
};

helper add_db2 => async sub ($self, $table1, $table2) {
    # run sequentially. table1 first, then table2
    my @table1 = await Mojo::Promise->all( map { $self->db2_LocationA_p($_),  $self->db2_LocationB_p($_) } @$table1 );
    my @table2 = await Mojo::Promise->all( map { $self->db2_LocationA_p($_),  $self->db2_LocationB_p($_) } @$table2 );
    return (@table1, @table2);
};

any '/' => async sub ($self) {
    my $param = $self->param('param');

    my ($db1_table1, $db1_table2, $db1_table3, $db2_table1, $db2_table2);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=FOO${param};);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=BAR${param};);
    push @$db1_table1, qq(ADD DB1 TABLE1 : ID=BAZ${param};);
    push @$db1_table2, qq(ADD DB1 TABLL2 : ID=ABC, IDs = FOO${param}, BAR${param}, BAZ${param};);
    push @$db1_table2, qq(ADD DB1 TABLL2 : ID=XYZ, IDs = FOO${param}, BAR${param}, BAZ${param};);
    push @$db1_table3, qq(ADD DB1 TABLE3 : ID=ZZZ ,IDs = ABC, XYZ;);

    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=FOO${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=BAR${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=BAZ${param};);
    push @$db2_table1, qq(ADD DB2 TABLE1 : ID=QUX${param};);
    push @$db2_table2, qq(ADD DB2 TABLE2 : ID=FOO, IDs = FOO${param}, BAR${param}, BAZ${param}, QUX${param};);
    push @$db2_table2, qq(ADD DB2 TABLE2 : ID=BAR, IDs = FOO${param}, BAR${param}, BAZ${param}, QUX${param};);
    
    $self->render_later();
    my @results = eval {
        await Mojo::Promise->all(
            # run concurrently. db1 & db2 can run in parallel at the same time.
            $self->add_db1($db1_table1, $db1_table2, $db1_table3),
            $self->add_db2($db2_table1, $db2_table2),
            )};
    
    if (my $err = $@) {
        warn "Something went wrong: " . dumper($err);
        $self->render(json => $err, status=>502 );
    } else {
        say STDERR dumper(@results);
        $self->render(json => {db1=>$results[0], db2=>$results[1]});
    }
};

app->start;

In the example above:

  • add_db1() would immediately make 3 requests to ADD DB1 TABLE1 for db1_LocationA and another 3 on db1_LocationB; and
  • add_db2() would immediately make 3 requests to ADD DB2 TABLE1 for db2_LocationA and another 3 on db2_LocationB.

thus a total of 12 requests to add_db1() & add_db2() combined. My question is, whether it is possible to limit that to, say 3 in total (as an example)

h q
  • 1,168
  • 2
  • 10
  • 23
  • So in `add_db_v1()` you start the jobs `add_db1()` and `add_db2()` concurrently using `Mojo::Promise->all()` ? What happens if you use `Mojo::Promise->map()` instead and limit concurrency here? – Håkon Hægland Jul 19 '23 at 13:50
  • You're right, that would be possible. But I assumed (not tested) that it would only limit the concurrency for `add_db1()` and `add_db2()`, and not the promises inside them (?). – h q Jul 19 '23 at 19:20
  • Re "*only limit the concurrency for add_db1() and add_db2()*", So you want to limit the concurrency of 2 tasks? Limit it to what, 1 at a time??? If so, that's a weird way to say you want to do things sequentially. If not, then it's the same as no limit! – ikegami Jul 19 '23 at 20:16
  • I am sorry @ikegami, I wasn't clear. In my application `add_db1()`, contains 3 tables. For each `table`, I have, say, 200 commands. So the 200 commands for table1 must go first, then the 200 commands for table2, and the 200 commands for table3. I shortened the code for brevity. The commands for and number of tables for `add_db1()` and `add_db2()`, etc are different. – h q Jul 20 '23 at 16:42
  • That's just `for ( @commands_for_table1, @commands_for_table2, @commands_for_table3 ) { await do_command( $_ ) }` plus some exception handling. Or can the 200 commands be done in parallel? Again making us guess. – ikegami Jul 20 '23 at 17:21
  • It's reached the point where I won't contribute further without a minimal, runnable demonstration of the problem. – ikegami Jul 20 '23 at 17:25
  • Dear @ikegami. I've added a runnable example. I would like to limit the concurrency so no more than N requests can run at any given time. Also, if is is not against the rules, and you're willing, I would like to engage on 1-on-1 paid knowledge session with you. Thanks again for your time. – h q Jul 21 '23 at 15:11
  • yeah, so it's just `for ( @commands_for_table1, @commands_for_table2, @commands_for_table3 ) { await do_command( $_ ) }` – ikegami Jul 21 '23 at 15:53
  • I jumped back into this prematurely. You may have included your code, but you haven't demonstrated the problem yet. Specifically, how does the code's behaviour deviate from what's expected? – ikegami Jul 21 '23 at 16:40
  • Let me phrase that differently. You ask "is it possible to (say) limit the number of hits to http://localhost:8000 to only 3 at any given moment?" Is this really the entirety of your question? There's no mention of database in there. So if there's three requests to one database, you still want to block all other requests, including requests to other databases? – ikegami Jul 21 '23 at 16:45
  • I used `localhost:8000` just to provide a runnable example. In reality, I connect to a legacy "gateway" device via `Net::Telnet`, I then select a database element `db1_A`, `db1_B`, `db2_A`, `db2_B`, `dbN_A`, etc. The total number of connections I can make to this "gateway" is limited, cannot exceed a specific number (I believe 15). So that was my motive. The reason I also asked about `all_settled` is for the delete use-case. I would delete from table1, table2, tableN regardless whether it exists or not. However, I can't exceed the number of connections to the Gateway. Cant thank u enough – h q Jul 21 '23 at 17:04

1 Answers1

1

I have some experience making http requests with Mojo::Base, I don't quite understand the code you made :(, but this is for example how I make http requests:

#!/usr/bin/perl

use Mojo::Base qw(-strict -signatures -async_await);
use Mojo::Promise;
use Mojo::UserAgent;

my $ua = Mojo::UserAgent->new;
my @urls = map{"http://ffuf.me/cd/pipes/user?id=$_"} 300..1000;

async sub get_pp($url) {
    my $tx = await $ua->get_p($url);

    my $body = $tx->res->body;
    say $tx->req->url;
   
    if ($body!~/Not Found/i) {
       say $tx->req->url . " " . $body;
       exit;
    }
}

async sub main(@urls) {
    await Mojo::Promise->map({concurrency=>20}, sub {
                        get_pp($_) }, @urls);
}

await main(@urls);

-- it is fuzzing from 300 to 1000 in the id parameter, using 20 http requests concurrently

I have more examples like this in my github repo: https://github.com/spoNge369/perl_scrap

before modifying any of the code you provided, you could test these attributes:

  • connect_timeout()
  • inactivity_timeout()
  • max_connections()
  • request_timeout()

apply to the $ua(user-agent) example: Total limit of 5 seconds, of which 3 seconds may be spent connecting $ua->max_redirects(0)->connect_timeout(3)->request_timeout(5);

https://docs.mojolicious.org/Mojo/UserAgent#request_timeout

spongi
  • 11
  • 1