1

EDITED 2021-07-21 BELOW

I have a script in which I connect to a PostgreSQL DB and also spawn multiple threads using Parallel::ForkManager.

I create a database handle and then prepare both a SELECT SQL statement and an INSERT SQL statement. I want to be able to run the SQL statements from each thread spawned by Parallel::ForkManager, but it fails because I can't share the DB handle between threads.

I need to take the script below (which fails IF I spawn more than one thread, but works if I only spawn a single thread), and change it such that each thread can read/write from/to the DB respectively.

I know I can clone a database handle, but I also know there is more to it.

How can I have parallel DB handles/SQL statements?

I apologize for the length of this, but I need to give as complete an example as possible.

Example:

use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;

#@codes = ("A");                 # testing single thread
@codes = ("A","B","F","M","S"); # testing multi-thread

################################################################
# connect to db
################################################################
my $dsn    = 'DBI:Pg:dbname=DB_NAME';
my $userid = "DB_USERNAME";
my $sesame = "DB_PASSWORD";
my $dbh    = DBI->connect($dsn, $userid, $sesame, {
    AutoCommit => 1,
    RaiseError => 1,
    PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;

################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
    . "I am $me, "
    . "version of DBD::Pg is $DBD::Pg::VERSION, "
    . "server is $sversion\n";
print "Name: $dbh->{Name}\n";

my %columns;   # hash for persistent mapping of column-values
my @columns;   # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table

my $placeholders = join(", ", map { '?' } @columns);

################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
    qq(SELECT count(*) FROM mytable WHERE id = ?;);
# prepare the SELECT statement handle
my $sth_select_statement = $dbh->prepare_cached($sql_select_statement);

################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
      "INSERT INTO mytable ("
    . join(", ", @columns)       # column names
    . ") VALUES ($placeholders)";
# prepare the INSERT statement handle
my $sth_insert_statement = $dbh->prepare_cached($sql_insert_statement);

################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);

$optimization->run_on_start(sub{
    my ($pid,$ident) = @_;
    print "Starting $ident under process id $pid\n";
});

$optimization->run_on_finish(sub{
    my ($pid,
        $exit_code,
        $ident,
        $exit_signal,
        $core_dump,
        $data_structure_reference) = @_;
});

my $thread_count = 0;

OPTIMIZATION:
for my $code (@codes) {
    $thread_count++;
    print "Thread $thread_count running for $code\n";

    # fork optimization threads - per code
    $optimization->start($code) and next OPTIMIZATION;

    if ($code =~ m/A/i) {
        sub_a("A");
    } elsif ($code =~ m/B/i) {
        sub_b("B");
    } elsif ($code =~ m/F/i) {
        sub_f("F");
    } elsif ($code =~ m/M/i) {
        sub_m("M");
    } elsif ($code =~ m/S/i) {
        sub_s("S");
    }
    print "\$optimization->finish on child $code\n";
    $optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();

################################################################
sub sub_a {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to A
    $varset{field1} = 'a_f1'; # for illustrative purposes
    $varset{field2} = 'a_f2';
    $varset{field3} = 'a_f3';
    $varset{field4} = 'a_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_b {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to B
    $varset{field1} = 'b_f1'; # for illustrative purposes
    $varset{field2} = 'b_f2';
    $varset{field3} = 'b_f3';
    $varset{field4} = 'b_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_f {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to F
    $varset{field1} = 'f_f1'; # for illustrative purposes
    $varset{field2} = 'f_f2';
    $varset{field3} = 'f_f3';
    $varset{field4} = 'f_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_m {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to M
    $varset{field1} = 'm_f1'; # for illustrative purposes
    $varset{field2} = 'm_f2';
    $varset{field3} = 'm_f3';
    $varset{field4} = 'm_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_s {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to S
    $varset{field1} = 's_f1'; # for illustrative purposes
    $varset{field2} = 's_f2';
    $varset{field3} = 's_f3';
    $varset{field4} = 's_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub write_to_db {
################################################################
    my ($code_name, @values) = @_;
    
    my $rv_code = $sth_select_statement->execute($code_name);
    if($rv_code < 0) {
       print $DBI::errstr;
    }
    my @row = $sth_select_statement->fetchrow_array();
    # if the SELECT found no existing records for this strategy, then INSERT it
    unless ($row[0] > 0) {
        # INSERT settings into 'mytable'
        $sth_insert_statement->execute(@values);
    }
}

################################################################
sub set_columns {
################################################################
    $columns{code}   = 0;
    $columns{field1} = 1;
    $columns{field2} = 2;
    $columns{field3} = 3;
    $columns{field4} = 4;
    
    $columns[0] = 'code';
    $columns[1] = 'field1';
    $columns[2] = 'field2';
    $columns[3] = 'field3';
    $columns[4] = 'field4';
}

EDIT 2021-07-21:

I have added clone_dbh and create_dbh sub-routines to try different methods. Neither work. I am still getting the same error - can't share DBH between threads. I don't know what else to do other than give up on sub-routines and write duplicate code for each thread spawned by Parallel::ForkManager, which I really do not want to do.

Example 2:

use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;

#@codes = ("A");                 # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;

################################################################
# connect to db
################################################################
my $dsn    = 'DBI:Pg:dbname=$ENV{DB_NAME}';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};

my %dbh;   # hash for storing dbh handles

my $dbh    = DBI->connect($dsn, $userid, $sesame, {
    AutoCommit => 1,
    RaiseError => 1,
    PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;

################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
    . "I am $me, "
    . "version of DBD::Pg is $DBD::Pg::VERSION, "
    . "server is $sversion\n";
print "Name: $dbh->{Name}\n";

################################################################
# prepare array and hash for matching db columns
################################################################
my %columns;   # hash for persistent mapping of column-values
my @columns;   # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table

my $placeholders = join(", ", map { '?' } @columns);

################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
    qq(SELECT count(*) FROM mytable WHERE id = ?;);

################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
      "INSERT INTO mytable ("
    . join(", ", @columns)       # column names
    . ") VALUES ($placeholders)";

################################################################################################
# create clones of database handle and SQL statements for threads
################################################################################################
my %sth_select_code;
my %sth_insert_code;

#for my $code (@codes) {
#    $dbh{$code} = $dbh->clone();
#    # prepare the SELECT statement handle
#    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
#    # prepare the INSERT statement handle
#    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
#}

################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);

$optimization->run_on_start(sub{
    my ($pid,$ident) = @_;
    print "Starting $ident under process id $pid\n";
});

$optimization->run_on_finish(sub{
    my ($pid,
        $exit_code,
        $ident,
        $exit_signal,
        $core_dump,
        $data_structure_reference) = @_;
});

my $thread_count = 0;

OPTIMIZATION:
for my $code (@codes) {
    $thread_count++;
    print "Thread $thread_count running for $code\n";

    # fork optimization threads - per code
    $optimization->start($code) and next OPTIMIZATION;

    if ($code =~ m/A/i) {
        sub_a("A");
    } elsif ($code =~ m/B/i) {
        sub_b("B");
    } elsif ($code =~ m/F/i) {
        sub_f("F");
    } elsif ($code =~ m/M/i) {
        sub_m("M");
    } elsif ($code =~ m/S/i) {
        sub_s("S");
    }
    print "\$optimization->finish on child $code\n";
    $optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();

################################################################################################
# disconnect from database
################################################################################################
for my $code (@codes) {
    $sth_select_code{$code}->finish();
    $sth_insert_code{$code}->finish();
    $dbh{$code}->disconnect;
}
$dbh->disconnect;

################################################################################################
# end
################################################################################################
exit;

################################################################
sub sub_a {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to A
    $varset{field2} = 'a_f1'; # for illustrative purposes
    $varset{field3} = 'a_f2';
    $varset{field4} = 'a_f3';
    $varset{field5} = 'a_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_b {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to B
    $varset{field2} = 'b_f1'; # for illustrative purposes
    $varset{field3} = 'b_f2';
    $varset{field4} = 'b_f3';
    $varset{field5} = 'b_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_f {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to F
    $varset{field2} = 'f_f1'; # for illustrative purposes
    $varset{field3} = 'f_f2';
    $varset{field4} = 'f_f3';
    $varset{field5} = 'f_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_m {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to M
    $varset{field2} = 'm_f1'; # for illustrative purposes
    $varset{field3} = 'm_f2';
    $varset{field4} = 'm_f3';
    $varset{field5} = 'm_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_s {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to S
    $varset{field2} = 's_f1'; # for illustrative purposes
    $varset{field3} = 's_f2';
    $varset{field4} = 's_f3';
    $varset{field5} = 's_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################################
sub create_dbh {
################################################################################
    my $code = shift;
    
    $dbh{$code} = DBI->connect($dsn, $userid, $sesame, {
        AutoCommit => 1,
        RaiseError => 1,
        PrintError => 1
    }) or die "Connection failed!\n" . $DBI::errstr;
    
    # did it work? are we there yet?
    my $me = $dbh{$code}->{Driver}{Name};
    my $sversion = $dbh{$code}->{pg_server_version};
    print "DBI is version $DBI::VERSION, "
        . "I am $me, "
        . "version of DBD::Pg is $DBD::Pg::VERSION, "
        . "server is $sversion\n";
    print "Name: $dbh->{Name}\n";
    # prepare the SELECT statement handle
    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
    # prepare the INSERT statement handle
    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}

################################################################
sub clone_dbh {
################################################################
    my $code = shift;
    
    $dbh{$code} = $dbh->clone();
    # prepare the SELECT statement handle
    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
    # prepare the INSERT statement handle
    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}

################################################################
sub write_to_db {
################################################################
    my ($code, @values) = shift @_;
    
    my $rv_code = $sth_select_code{$code}->execute($code);
    if($rv_code < 0) {
       print $DBI::errstr;
    }
    my @row = $sth_select_code{$code}->fetchrow_array();
    # if the SELECT found no existing records for this strategy, then INSERT it
    unless ($row[0] > 0) {
        # INSERT settings into 'mytable'
        $sth_insert_code{$code}->execute(@values);
    }
}

################################################################
sub set_columns {
################################################################
    $columns{field1} = 0;
    $columns{field2} = 1;
    $columns{field3} = 2;
    $columns{field4} = 3;
    $columns{field5} = 4;
    
    $columns[0] = 'field1';
    $columns[1] = 'field2';
    $columns[2] = 'field3';
    $columns[3] = 'field4';
    $columns[4] = 'field5';
}

Do I have to pass dbh and statement handles to the create_dbh sub explicitly from within the calling sub_x thread?

Do I have to return the handles or handle_refs from create_dbh back to calling sub_x thread?

I don't know how to get around this, but it seems like a lexical scope or object/memory access issue.

Any more ideas?

skeetastax
  • 1,016
  • 8
  • 18
  • *"I want to be able to run the SQL statements from each thread spawned by Parallel::ForkManager"* In fact it does not spawn threads, it spawns processes, see [Which is faster in perl, Parallel::ForkManager or threads?](https://stackoverflow.com/q/16657630/2173773) – Håkon Hægland Jul 14 '21 at 12:44
  • Have you tried to put the `DBI->connect()` inside each child processes? Such that each process has its separate database connection, see [Parallel::ForkManager, DBI using memory](https://www.perlmonks.org/?node_id=405911) – Håkon Hægland Jul 14 '21 at 12:57
  • Here is another example using [`DBIx::Connector`](https://metacpan.org/dist/DBIx-Connector/view/lib/DBIx/Connector.pm): [Perl Parallel::ForkManager with DBI database handlers](https://stackoverflow.com/q/3127229/2173773) – Håkon Hægland Jul 14 '21 at 13:01
  • Thanks @HåkonHægland. I appreciate your tips. I've been using `Parallel::ForkManager` for a few years because it worked well for me. I tried looking at `threads` and also `MCE` but it was very difficult trying to translate what had done in `P::FM` to those with respect to prevention of memory errors and data clobbering. I solved it below by creating a hash to store the `\$dbh_clone`s and `\$statement_clone`s in. I will look at those examples, though. Always another way to do it. – skeetastax Jul 14 '21 at 13:13

2 Answers2

2

I am not quite certain what you want, but perhaps these pointers will help:

  • Prepared statements are local to the database session in which they were created. So if you want them in all your database sessions, you have to prepare them in each one.

  • You can always run statements in parallel, as long as each statement is running in its own database session.

  • You can also get parellelization of a single query inside the database, so that several database processes work on it together. This happens automatically if PostgreSQL is configured appropriately and the optimizer thinks that a query will benefit. See the documentation for details.

Laurenz Albe
  • 209,280
  • 17
  • 206
  • 263
  • Thanks. The problem is that you can't share one `dbh` between multiple threads. If a second thread tries to use the original `dbh` it will throw an error similar to this: `...failed: handle 2 is owned by thread d97fe8 not current thread 3a01058 (handles can't be shared between threads and your driver may need a CLONE method added) at file.pl line 180, line 1.` – skeetastax Jul 14 '21 at 12:16
  • Then don't do that. – Laurenz Albe Jul 14 '21 at 13:15
  • reading your comments and thinking about them, the problem is not internal DB query parallelization, the problem is that I am spawning multiple threads in Perl using `Parallel::ForkManager`, and *each* thread needs to `QUERY` and `INSERT` data. The nature of `DBI` is that only one thread can access any given `dbh`. I have tried in vain to create a separate `dbh` per `P::FM` thread. So if you can see what I'm doing wrong, I'm all ears... – skeetastax Jul 21 '21 at 13:36
  • As i said, have each thread open its own connection. You should perhaps use a connection pool to keep the number of connections limited. – Laurenz Albe Jul 21 '21 at 13:39
  • Do you mean to say that calling the `create_dbh` sub (in which I create a new connection) from within the `sub_x` child thread does not create a separate `dbh` specific to the `sub_x` thread? Are you saying in essence that I cannot abstract the creation of a `dbh` connection to another `sub`? I just want to understand... – skeetastax Jul 21 '21 at 13:48
  • I didn't read through your wad of code, and my Perl is not good. But if you say that you cannot use a connection in several handles, you have to call `DBI->connect` in each thread that wants a database connection. Makes sense, doesn't it? – Laurenz Albe Jul 21 '21 at 14:00
  • my point is, I am, though each thread is calling a sub to do it... – skeetastax Jul 21 '21 at 14:09
  • It does not matter if it is in a function or not, does it? – Laurenz Albe Jul 21 '21 at 14:48
  • not technically, but I want to put it into a `sub` so I only have to write the code once, then call it from each thread, whereby each thread get's it's own `dbh` to work with, independent of the other threads. I am trying various things, and I think I almost have a working script... – skeetastax Jul 22 '21 at 01:25
  • I just got it working. Will post working solution tonight... – skeetastax Jul 22 '21 at 01:41
0

I got it working using the code below:

use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;

#my @codes = ("A");                 # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;

################################################################
# get db connection info
################################################################
my $dsn    = 'DBI:Pg:dbname=mt4_test';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles

################################################################
# prepare array and hash for matching db columns
################################################################
my %columns;   # hash for persistent mapping of column-values
my @columns;   # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table

my $placeholders = join(", ", map { '?' } @columns);

################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
    qq(SELECT count(*) FROM mytable WHERE field1 = ?;);

################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
      "INSERT INTO mytable ("
    . join(", ", @columns)        # column names
    . ") VALUES ($placeholders)";

################################################################
# hash for storing SQL statement handles for threads
################################################################
my %sth_select_code;
my %sth_insert_code;

################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);

$optimization->run_on_start(sub{
    my ($pid,$ident) = @_;
    print "Starting $ident under process id $pid\n";
});

$optimization->run_on_finish(sub{
    my ($pid,
        $exit_code,
        $ident,
        $exit_signal,
        $core_dump,
        $data_structure_reference) = @_;
});

my $thread_count = 0;

OPTIMIZATION:
for my $code (@codes) {
    $thread_count++;
    print "Thread $thread_count running for $code\n";

    # fork optimization threads - per code
    if (scalar @codes > 1) {
        $optimization->start($code) and next OPTIMIZATION;
    } else {
        $optimization->start($code);
    }
    
    launch_sub($code);
    
    print "\$optimization->finish on child $code\n";
    $optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();

################################################################
# THE END
################################################################
exit;
################################################################



################################################################
sub launch_sub {
################################################################
    my $code = shift;
    if ($code =~ m/A/i) {
        sub_a("A");
    } elsif ($code =~ m/B/i) {
        sub_b("B");
    } elsif ($code =~ m/F/i) {
        sub_f("F");
    } elsif ($code =~ m/M/i) {
        sub_m("M");
    } elsif ($code =~ m/S/i) {
        sub_s("S");
    }
}
################################################################
sub sub_a {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to A
    $varset{field2} = 'a_f1'; # for illustrative purposes
    $varset{field3} = 'a_f2';
    $varset{field4} = 'a_f3';
    $varset{field5} = 'a_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub sub_b {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to B
    $varset{field2} = 'b_f1'; # for illustrative purposes
    $varset{field3} = 'b_f2';
    $varset{field4} = 'b_f3';
    $varset{field5} = 'b_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub sub_f {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to F
    $varset{field2} = 'f_f1'; # for illustrative purposes
    $varset{field3} = 'f_f2';
    $varset{field4} = 'f_f3';
    $varset{field5} = 'f_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub sub_m {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to M
    $varset{field2} = 'm_f1'; # for illustrative purposes
    $varset{field3} = 'm_f2';
    $varset{field4} = 'm_f3';
    $varset{field5} = 'm_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub sub_s {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to S
    $varset{field2} = 's_f1'; # for illustrative purposes
    $varset{field3} = 's_f2';
    $varset{field4} = 's_f3';
    $varset{field5} = 's_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub create_dbh {
################################################################
    my $dbh_ref = shift;
    my $sel_ref = shift;
    my $ins_ref = shift;
    
    ${$dbh_ref} = DBI->connect($dsn, $userid, $sesame, {
        AutoCommit => 1,
        RaiseError => 1,
        PrintError => 1
    }) or die "Connection failed!\n" . $DBI::errstr;
    
    # did it work? are we there yet?
    my $me = ${$dbh_ref}->{Driver}{Name};
    my $sversion = ${$dbh_ref}->{pg_server_version};
    print "DBI is version $DBI::VERSION, "
        . "I am $me, "
        . "version of DBD::Pg is $DBD::Pg::VERSION, "
        . "server is $sversion\n";
    print "Name: ${$dbh_ref}->{Name}\n";
    # prepare the SELECT statement handle
    ${$sel_ref} = ${$dbh_ref}->prepare_cached($sql_select_statement);
    # prepare the INSERT statement handle
    ${$ins_ref} = ${$dbh_ref}->prepare_cached($sql_insert_statement);
}

################################################################
sub write_to_db {
################################################################
    my ($code, @values) = @_;
    
    my $rv_code = $sth_select_code{$code}->execute($code);
    say "SQL SELECT for $code: rv_code = $rv_code";
    if($rv_code < 0) {
       print $DBI::errstr;
    }
    my @row = $sth_select_code{$code}->fetchrow_array();
    # if the SELECT found no existing records for this strategy, then INSERT it
    unless ($row[0] > 0) {
        # INSERT settings into 'mytable'
        $sth_insert_code{$code}->execute(@values);
        say "SQL INSERT for $code";
    }
}

################################################################
sub disconnect_dbh {
################################################################
    my $dbh_ref = shift;
    my $sel_ref = shift;
    my $ins_ref = shift;
    
    ${$sel_ref}->finish();
    ${$ins_ref}->finish();
    ${$dbh_ref}->disconnect;
    
    say "disconnected dbh_ref: $dbh_ref";
}

################################################################
sub set_columns {
################################################################
    $columns{field1} = 0;
    $columns{field2} = 1;
    $columns{field3} = 2;
    $columns{field4} = 3;
    $columns{field5} = 4;
    
    $columns[0] = 'field1';
    $columns[1] = 'field2';
    $columns[2] = 'field3';
    $columns[3] = 'field4';
    $columns[4] = 'field5';
}
skeetastax
  • 1,016
  • 8
  • 18