0

Basically, I grab all the rows from a schedule table, then process individual row of it. If the row is already in command table, skip. Otherwise, I insert it.

I have 2 chained promises within Promise.all(rows.map(function(row){

return is_schedule_cmd_already_pending(schedule_id).then(function(num){
   return insert_into_pending_cmd(num, schedule_id, device_name, cmd);  
});

I print the sql statement in console.log in is_schedule_cmd_already_pending and insert_into_pending_cmd

The printing order is out of sync.

It should be executed in order like this, for each row, in sync style.

- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending
insert_into_pending_cmd


- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending
insert_into_pending_cmd


- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending
insert_into_pending_cmd
.......
.......

Instead, it is like (i.e. all insert_into_pending_cmd happen at the very end, which is not what I want)

- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending



- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending



- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending

.......
.......
.......

insert_into_pending_cmd
insert_into_pending_cmd
insert_into_pending_cmd

Full code

var config = require("./config.js");
var Promise = require('bluebird');
var mysql = require('promise-mysql');
var ON_DEATH = require('death');


var g_pool = null;

function connect_db() {
  g_pool = mysql.createPool(config.db_config);
}


function close_db() {
  g_pool.end(function (err) {
    // all connections in the pool have ended
    });
}



// http://thecodeship.com/web-development/alternative-to-javascript-evil-setinterval/
function interval(func, wait, times){
    var interv = function(w, t) {
    return function() {
        if(typeof t === "undefined" || t-- > 0) {
        setTimeout(interv, w);
        try {
            func.call(null);
        }
        catch(e) {
            t = 0;
          throw e.toString();
        }
      }
        };
    }(wait, times);

  setTimeout(interv, wait);
}


function get_current_utc_time() {
  var curr_date_obj = new Date();
    var time_utc = "";

    // somehow the date format is not accurate.
  //var time_utc = dateFormat(now, "yyyy-mm-dd h:MM:ss", true);

    var year = curr_date_obj.getUTCFullYear(); 
    var month = add_zero(curr_date_obj.getUTCMonth() + 1); // count from 0
    var date = add_zero(curr_date_obj.getUTCDate()); // count from 1
    var hr = add_zero(curr_date_obj.getUTCHours());
    var min = add_zero(curr_date_obj.getUTCMinutes()); 
    // we ignore the second
    var sec = "00";

    time_utc = year + "-" + month + "-" + date + " " + hr + ":" + min + ":" + sec;

    console.log("-current utc-");
    console.log(time_utc);

  return time_utc;
};


// http://www.w3schools.com/jsref/jsref_getutchours.asp
function add_zero(i) {
  if (i < 10) {
    i = "0" + i;
  }
  return i;
}


function insert_into_pending_cmd(msg_obj) {
  console.log();
  console.log("-insert_into_pending_cmd-");

    var schedule_id = msg_obj.schedule_id;
    var device_name = msg_obj.device_name;
    var cmd = msg_obj.cmd;
    var is_pending = msg_obj.is_pending;

  if(is_pending) {
    return Promise.resolve();
  }
  else {
    var curr_time = get_current_utc_time();
    var sql = "insert into Command set CommandDate = " + "'" + curr_time + "'" + "," + "RemoteName = " + "'" + device_name + "'" + "," + "CommandJSON = " + "'" + cmd + "'" + "," + "CommandComplete = 0" + "," + "ScheduleId = " + "'" + schedule_id + "'";

    return g_pool.query(sql).then(function(){
      return Promise.resolve();
    });
  }
}


function is_schedule_cmd_already_pending(msg_obj) {
    console.log();
  console.log("-is_schedule_cmd_already_pending-");

    var schedule_id = msg_obj.schedule_id;
    var device_name = msg_obj.device_name;
    var cmd = msg_obj.cmd;
    var is_run = msg_obj.is_run;

    var local_msg_obj = {};

  if(is_run) {
    var sql = "select count(*) as num from Command where ScheduleId = " + "'" + schedule_id + "'" + " and CommandComplete = 0 and (UNIX_TIMESTAMP(UTC_TIMESTAMP()) - UNIX_TIMESTAMP(CommandDate)) < 600 and (UNIX_TIMESTAMP(UTC_TIMESTAMP()) - UNIX_TIMESTAMP(CommandDate)) > 0";
    return g_pool.query(sql).then(function(rows){
      var num = rows[0].num;
      if(num == 0) {
                local_msg_obj = {
                    schedule_id: schedule_id,
                    device_name: device_name,
                    cmd: cmd,
                    is_pending: false
                };
        return Promise.resolve(local_msg_obj);
      }
      else {
                local_msg_obj = {
          schedule_id: schedule_id,
          device_name: device_name,
          cmd: cmd,
          is_pending: true 
        };
        return Promise.resolve(local_msg_obj);
      }
    });
  }
  else {
        local_msg_obj = {
        schedule_id: schedule_id,
      device_name: device_name,
      cmd: cmd,
      is_pending: true 
    };
    return Promise.resolve(local_msg_obj);
  }
}




function is_matchup_schedule_time(row) {
    // get all field
  var schedule_id = row.ScheduleId;
  var device_name = row.ScheduleRemoteName;
  var day_code = row.ScheduleDaycode;
  var schedule_time = row.ScheduleTime;
  var cmd = row.ScheduleCommandJSON;

  // get hour and min
  var schedule_time_arr = schedule_time.split(":");
  var schedule_hour = schedule_time_arr[0];
  var schedule_min = schedule_time_arr[1];

  // print
  console.log();
  console.log();
  console.log("- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -");
  console.log(schedule_id);
  console.log(device_name);
  console.log(day_code);
  console.log(schedule_time);
  console.log(schedule_hour);
  console.log(schedule_min);
  console.log(cmd);

  // curr date obj
  var curr_date_obj = new Date();
  var curr_date_code = add_zero(curr_date_obj.getUTCDay());

    // print current
  console.log();
  console.log("- curr_date_code, curr_h, curr_min - ");
  console.log(curr_date_code);
  console.log(add_zero(curr_date_obj.getUTCHours()));
  console.log(add_zero(curr_date_obj.getUTCMinutes()));

    // var
    var msg_obj = {};

    // Match up day
  if(day_code == curr_date_code) {
    console.log();
    console.log(".. match up day ..");

    // Match up hour
    var curr_hour = add_zero(curr_date_obj.getUTCHours());
    if(schedule_hour == curr_hour) {
      console.log();
      console.log("~~ match up hour ~~");

      // Match up min
      var curr_min = add_zero(curr_date_obj.getUTCMinutes());
      if(schedule_min == curr_min) {
        console.log();
        console.log("## match up d-h-m, run ##");

                msg_obj = {
                    schedule_id: schedule_id,
                    device_name: device_name,
                    cmd: cmd,
                    is_run: true                                    
                };

                return Promise.resolve(msg_obj);            
      }
    }
  }
  else {

  }

    //
    msg_obj = {
    schedule_id: schedule_id,
    device_name: device_name,
    cmd: cmd,
    is_run: false 
  };

    return Promise.resolve(msg_obj);
}


// NOTE -------------
function process_schedule_rows(rows) {
    return Promise.mapSeries(rows, function(row) {
        return is_matchup_schedule_time(row)
            .then(is_schedule_cmd_already_pending)
            .then(insert_into_pending_cmd)
            .catch(function(e){
                throw e;
            })
    });
}


function do_schedule() {
  console.log();
  console.log("---- start do_schedule ----");

  g_pool.query("select * from Schedule order by ScheduleId asc")
  .then(process_schedule_rows)
    .catch(function(e){
        throw e;
    });
}


// main func
function main() {
    console.log("db host:");
  console.log(config.db_host);

    connect_db();

    interval(function(){
        do_schedule();
    }, 5000, undefined);

    // Clean up
  ON_DEATH(function(signal, err) {
    console.log();
    console.log("-- script interupted --");
    console.log("close db");

    // close db
    close_db();    

    process.exit();
  });

}


// run main func
main();
kenpeter
  • 7,404
  • 14
  • 64
  • 95
  • hang on ... the difference between expected and actual output is the fact that `insert_into_pending_cmd` gets called multiple times? – Jaromanda X Sep 09 '16 at 02:30
  • 1
    Is it the output you require in sequence, or does the whole processing of `row[1]` need to wait until `row[0]` is complete before beginning? – Jaromanda X Sep 09 '16 at 02:32
  • It's unclear what your required sequencing is. `Promise.all()` by definition is to be used for promises that are all in-flight at the same time when there is no required ordering among them (e.g. they can be run and finish in any order). So, it seems a bit odd that you're asking for particular sequencing among those rows. You're either expecting `Promise.all()` to do something it doesn't or you just didn't realize what to expect when you use it. – jfriend00 Sep 09 '16 at 02:45
  • @jfriend00 I updated the output sample. Basically, I have rows of result. For each row, I do 1. check if in table 2. if not in table, insert into table. – kenpeter Sep 09 '16 at 03:51
  • @JaromandaX, see my comment above and updated of sample output. – kenpeter Sep 09 '16 at 03:51
  • @kenpeter - it's still not clear if the processing of each row needs to wait until the previous row has completed or not – Jaromanda X Sep 09 '16 at 03:55
  • If you're trying to do one row sequenced after another, you're using entirely the wrong techniques. What you're using parallel techniques. You would need to use serializing techniques - completely different code. Your current code is designed to run all rows in parallel and in any order. – jfriend00 Sep 09 '16 at 04:05
  • @JaromandaX, I want "previous row completed then go to next row". Another question is why "insert_into_pending_cmd" happening multiple times at the end of the output? – kenpeter Sep 09 '16 at 04:06
  • @jfriend00, so what is the correct way? – kenpeter Sep 09 '16 at 04:07
  • because it's called once for each row – Jaromanda X Sep 09 '16 at 04:07
  • what I find odd is that you are iterating rows, and do nothing with the row itself in the callback ... that in itself looks wrong ... nevermind, just looked in the "full code" – Jaromanda X Sep 09 '16 at 04:09
  • I offered you a couple ways to serialize the row operations in my answer. – jfriend00 Sep 09 '16 at 04:57
  • @jfriend00, thx, I have updated the full code, but hit a strange bug. – kenpeter Sep 09 '16 at 23:28
  • bug: http://stackoverflow.com/questions/39421094/simple-schdule-nodejs-script-insert-records-twice-while-connecting-to-remote-dat – kenpeter Sep 09 '16 at 23:40

1 Answers1

1

Your Promise.all() pattern is running all your rows in parallel where completion of the various operations involved in processing them can happen in any order. That's how your code is designed to work.

To sequence them so one row runs at a time and the next row is processed after the prior one is completely done, you need to use a different design pattern. A classic way to serialize promises that are processing an array is using .reduce() like this:

// process each row sequentially
rows.reduce(function(p, row) {
    return p.then(function() {
        return is_schedule_cmd_already_pending(schedule_id).then(function(num) {
            return insert_into_pending_cmd(num, schedule_id, device_name, cmd);
        });
    });
}, Promise.resolve()).then(function(data) {
    // everything done here
}).catch(function(err) {
    // error here
});

This creates an extended promise chain where each row is processed as a step in the promise chain and the next link in the chain doesn't run until the prior one is done.


The above scheme works with standard ES6 promises. I personally prefer using the Bluebird promise library which has Promise.mapSeries() which is explicitly designed for this:

const Promise = require('bluebird');

Promise.mapSeries(rows, function(row) {
    return is_schedule_cmd_already_pending(schedule_id).then(function(num) {
        return insert_into_pending_cmd(num, schedule_id, device_name, cmd);
    });
}).then(function(data) {
    // everything done here
}).catch(function(err) {
    // error here
});

FYI, there are lots of issues with error handling in your real code. Promises make async error handling orders of magnitudes easier. If you promisify your lower level operations (or use the promise interface to your database) and then write your control flow and logic only in promise-based code, it will be massively easier to write proper error handling. Lines of code like this:

if (err) throw err;

that are inside a plain async callback are NOT going to give you proper error handling. Use promises for everything in your control flow and it will be very easy to propagate and handle async errors. It's actually quite difficult to do this properly with nested plain async callbacks and your code shows several mistakes. Convert all async operations to promises and it will be easy to do it right.

jfriend00
  • 683,504
  • 96
  • 985
  • 979