In Mojolicious full app, I have the following pattern.
- Run some legacy requests concurrently (e.g. insert into
db1
&db2
at the same time); and - Run some legacy requests sequentially (e.g. for each db, insert
table1
first, thentable2
afterwards, etc).
Note: for the example below I used httpbin.org
Docker image:
docker run --rm --name httpbin -p 8000:80 kennethreitz/httpbin
How can I use Mojo::Promise->map
to limit concurrency
for this pattern? Can concurrency
apply to all my actions in db1
and db2
? In my example below is it possible to (say) limit the number of hits to http://localhost:8000
to only 3 at any given moment?
use Mojolicious::Lite -signatures, -async_await;
use Mojo::Util qw(dumper);
helper db1_LocationA_p => async sub ($self, $request)
{
return Mojo::Promise->new(sub($resolve, $reject ) {
Mojo::IOLoop->subprocess(
sub {
my $tx = $self->ua->post('http://localhost:8000/delay/2' => json => $request);
my $res = $tx->result;
die $res->message if $res->is_error;
$res->json;
},
sub {
my ( $subprocess, $err, @res ) = @_;
$reject->( $err ) if $err;
$resolve->( @res );
}
);
});
};
helper db1_LocationB_p => async sub ($self, $request) {
return $self->db1_LocationA_p("LocationB $request"); # For brevity
};
helper db2_LocationA_p => async sub ($self, $request)
{
return Mojo::Promise->new(sub($resolve, $reject ) {
Mojo::IOLoop->subprocess(
sub {
my $tx = $self->ua->post('http://localhost:8000/delay/5' => json => $request);
my $res = $tx->result;
die $res->message if $res->is_error;
$res->json;
},
sub {
my ( $subprocess, $err, @res ) = @_;
$reject->( $err ) if $err;
$resolve->( @res );
}
);
});
};
helper db2_LocationB_p => async sub ($self, $request) {
return $self->db2_LocationA_p("LocationB $request"); # For brevity
};
helper add_db1 => async sub($self, $table1, $table2, $table3) {
# run sequentially. table1 first, then table2, then table3
my @table1 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_), $self->db1_LocationB_p($_) } @$table1 );
my @table2 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_), $self->db1_LocationB_p($_) } @$table2 );
my @table3 = await Mojo::Promise->all( map { $self->db1_LocationA_p($_), $self->db1_LocationB_p($_) } @$table3 );
return (@table1, @table2, @table3);
};
helper add_db2 => async sub ($self, $table1, $table2) {
# run sequentially. table1 first, then table2
my @table1 = await Mojo::Promise->all( map { $self->db2_LocationA_p($_), $self->db2_LocationB_p($_) } @$table1 );
my @table2 = await Mojo::Promise->all( map { $self->db2_LocationA_p($_), $self->db2_LocationB_p($_) } @$table2 );
return (@table1, @table2);
};
any '/' => async sub ($self) {
my $param = $self->param('param');
my ($db1_table1, $db1_table2, $db1_table3, $db2_table1, $db2_table2);
push @$db1_table1, qq(ADD DB1 TABLE1 : ID=FOO${param};);
push @$db1_table1, qq(ADD DB1 TABLE1 : ID=BAR${param};);
push @$db1_table1, qq(ADD DB1 TABLE1 : ID=BAZ${param};);
push @$db1_table2, qq(ADD DB1 TABLL2 : ID=ABC, IDs = FOO${param}, BAR${param}, BAZ${param};);
push @$db1_table2, qq(ADD DB1 TABLL2 : ID=XYZ, IDs = FOO${param}, BAR${param}, BAZ${param};);
push @$db1_table3, qq(ADD DB1 TABLE3 : ID=ZZZ ,IDs = ABC, XYZ;);
push @$db2_table1, qq(ADD DB2 TABLE1 : ID=FOO${param};);
push @$db2_table1, qq(ADD DB2 TABLE1 : ID=BAR${param};);
push @$db2_table1, qq(ADD DB2 TABLE1 : ID=BAZ${param};);
push @$db2_table1, qq(ADD DB2 TABLE1 : ID=QUX${param};);
push @$db2_table2, qq(ADD DB2 TABLE2 : ID=FOO, IDs = FOO${param}, BAR${param}, BAZ${param}, QUX${param};);
push @$db2_table2, qq(ADD DB2 TABLE2 : ID=BAR, IDs = FOO${param}, BAR${param}, BAZ${param}, QUX${param};);
$self->render_later();
my @results = eval {
await Mojo::Promise->all(
# run concurrently. db1 & db2 can run in parallel at the same time.
$self->add_db1($db1_table1, $db1_table2, $db1_table3),
$self->add_db2($db2_table1, $db2_table2),
)};
if (my $err = $@) {
warn "Something went wrong: " . dumper($err);
$self->render(json => $err, status=>502 );
} else {
say STDERR dumper(@results);
$self->render(json => {db1=>$results[0], db2=>$results[1]});
}
};
app->start;
In the example above:
add_db1()
would immediately make 3 requests toADD DB1 TABLE1
fordb1_LocationA
and another 3 ondb1_LocationB
; andadd_db2()
would immediately make 3 requests toADD DB2 TABLE1
fordb2_LocationA
and another 3 ondb2_LocationB
.
thus a total of 12 requests to add_db1()
& add_db2()
combined. My question is, whether it is possible to limit that to, say 3 in total (as an example)