2

I have a script that is pulling 25,000 records from AWS Athena which is basically a PrestoDB Relational SQL Database. Lets say that I'm generating a request for each one of these records, which means I have to make 25,000 requests to Athena, then when the data comes back I have to make 25,000 requests to my Redis Cluster.

What would be the ideal amount of requests to make at one time from node to Athena?

The reason I ask is because I tried to do this by creating an array of 25,000 promises and then calling Promise.all(promiseArray) on it, but the app just hanged forever.

So I decided instead to fire off 1 at a time and use recursion to splice the first index out and then pass the remaining records to the calling function after the promise has been resolved.

The problem with this is that it takes forever. I took about an hour break and came back and there were 23,000 records remaining.

I tried to google how many requests Node and Athena can handle at once, but I came up with nothing. I'm hoping someone might know something about this and be able to share it with me.

Thank you.

Here is my code just for reference:

As a sidenote, what I would like to do differently is instead of sending one request at a time I could send 4, 5, 6, 7 or 8 at a time depending on how fast it would execute.

Also, how would a Node cluster effect the performance of something like this?

exports.storeDomainTrends = () => {
return new Promise((resolve, reject)=>{
    athenaClient.execute(`SELECT DISTINCT the_column from "the_db"."the_table"`,
    (err, data) =>  {
        var getAndStoreDomainData = (records) => {
            if(records.length){
                return new promise((resolve, reject) => {
                    var subrecords = records.splice(0, )[0]
                    athenaClient.execute(`
                    SELECT 
                    field,
                    field,
                    field,
                    SUM(field) as field
                    FROM "the_db"."the_table"
                    WHERE the_field IN ('Month') AND the_field = '`+ record.domain_name +`'
                    GROUP BY the_field, the_field, the_field
                    `, (err, domainTrend) => {

                        if(err) {
                            console.log(err)
                            reject(err)
                        }

                        redisClient.set(('Some String' + domainTrend[0].domain_name), JSON.stringify(domainTrend))
                        resolve(domainTrend);
                    })
                })
                .then(res => {
                    getAndStoreDomainData(records);
                })
            }
        }

        getAndStoreDomainData(data);

    })
})

}

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
Miguel Coder
  • 1,896
  • 19
  • 36
  • Have a look at https://stackoverflow.com/questions/47967232/how-to-make-a-promise-resolve-with-a-specific-condition/47967532#47967532, you probably want to chunk your requests and process one chunk at a time. The chunk size is dependent on your system, so we cant estimate that for you, you probably need to test that yourself. – Jonas Wilms Jan 11 '18 at 18:13
  • 2
    It depends on a lot of factors. Some that are out of your control. For example, AWS Athena has request limits. https://docs.aws.amazon.com/athena/latest/ug/service-limits.html – Kevin B Jan 11 '18 at 18:14
  • 1
    @Kevin, Thanks I think that solves my problem as to why my requests were stalling. If you would like to submit that as your answer I will gladly accept it. – Miguel Coder Jan 11 '18 at 18:33
  • 1
    You could use Promise.all in batches of 1000 and throttle requests made per time period (like max 20 per second) or throttle active requests (max 100). The limit is not only what your hardware handles and what the software on your hardware allows but also what the receiver of your requests can handle or allows. [Here](https://stackoverflow.com/a/48001650/1641941) is an example of throttling promises. – HMR Jan 12 '18 at 05:11

2 Answers2

1

Using the lib your code could look something like this:

const Fail = function(reason){this.reason=reason;};
const isFail = x=>(x&&x.constructor)===Fail;
const distinctDomains = () =>
  new Promise(
    (resolve,reject)=>
      athenaClient.execute(
        `SELECT DISTINCT domain_name from "endpoint_dm"."bd_mb3_global_endpoints"`,
        (err,data)=>
          (err)
            ? reject(err)
            : resolve(data)
      )
  );
const domainDetails = domain_name =>
  new Promise(
    (resolve,reject)=>
      athenaClient.execute(
        `SELECT 
        timeframe_end_date,
        agg_type,
        domain_name,
        SUM(endpoint_count) as endpoint_count
        FROM "endpoint_dm"."bd_mb3_global_endpoints"
        WHERE agg_type IN ('Month') AND domain_name = '${domain_name}'
        GROUP BY timeframe_end_date, agg_type, domain_name`,
        (err, domainTrend) =>
            (err)
              ? reject(err)
              : resolve(domainTrend)
        )
  );
const redisSet = keyValue =>
  new Promise(
    (resolve,reject)=>
      redisClient.set(
        keyValue,
        (err,res)=>
          (err)
            ? reject(err)
            : resolve(res)
      )
  );
const process = batchSize => limitFn => resolveValue => domains => 
  Promise.all(
    domains.slice(0,batchSize)
    .map(//map domains to promises
      domain=>
        //maximum 5 active connections
        limitFn(domainName=>domainDetails(domainName))(domain.domain_name)
        .then(
          domainTrend=>
            //the redis client documentation makes no sense whatsoever
            //https://redis.io/commands/set
            //no mention of a callback
            //https://github.com/NodeRedis/node_redis
            //mentions a callback, since we need the return value
            //and best to do it async we will use callback to promise
            redisSet([
              `Endpoint Profiles - Checkin Trend by Domain - Monthly - ${domainTrend[0].domain_name}`,
              JSON.stringify(domainTrend)
            ])
        )
        .then(
          redisReply=>{
            //here is where things get unpredictable, set is documented as 
            //  a synchronous function returning "OK" or a function that
            //  takes a callback but no mention of what that callback recieves
            //  as response, you should try with one or two records to
            //  finish this on reverse engineering because documentation
            //  fails 100% here and can not be relied uppon.
            console.log("bad documentation of redis client... reply is:",redisReply);
            (redisReply==="OK")
              ? domain
              : Promise.reject(`Redis reply not OK:${redisReply}`)
          }
        )
        .catch(//catch failed, save error and domain of failed item
          e=>
            new Fail([e,domain])
        )
    )
  ).then(
    results=>{
      console.log(`got ${batchSize} results`);
      const left = domains.slice(batchSize);
      if(left.length===0){//nothing left
        return resolveValue.conat(results);
      }
      //recursively call process untill done
      return process(batchSize)(limitFn)(resolveValue.concat(results))(left)
    }
  );
const max5 = lib.throttle(5);//max 5 active connections to athena
distinctDomains()//you may want to limit the results to 50 for testing
//you may want to limit batch size to 10 for testing
.then(process(1000)(max5)([]))//we have 25000 domains here
.then(
  results=>{//have 25000 results
    const successes = results.filter(x=>!isFail(x));
    //array of failed items, a failed item has a .reason property
    //  that is an array of 2 items: [the error, domain]
    const failed = results.filter(isFail);
  }
)

You should figure out what redis client does, I tried to figure it out using the documentation but may as well ask my goldfish. Once you've reverse engineered the client behavior it is best to try with small batch size to see if there are any errors. You have to import lib to use it, you can find it here.

HMR
  • 37,593
  • 24
  • 91
  • 160
1

I was able to take what Kevin B said to find a much quicker way to query the data. What I did was change the query so that I could get the trend for all domains from Athena. I ordered it by domain_name and then sent it as a Node stream so that I could separate out each domain name into it's own JSON as the data was coming in.

Anyways this is what I ended up with.

exports.storeDomainTrends = () => {
return new Promise((resolve, reject)=>{
    var streamObj = athenaClient.execute(`
    SELECT field,
            field,
            field,
            SUM(field) AS field
    FROM "db"."table"
    WHERE field IN ('Month')
    GROUP BY  field, field, field
    ORDER BY  field desc`).toStream();

    var data = [];

    streamObj.on('data', (record)=>{
        if (!data.length || record.field === data[0].field){
            data.push(record)
        } else if (data[0].field !== record.field){
            redisClient.set(('Key'), JSON.stringify(data))
            data = [record]
        }
    })

    streamObj.on('end', resolve);

    streamObj.on('error', reject);

})
.then()

}

Miguel Coder
  • 1,896
  • 19
  • 36
  • 1
    If each domain has multiple `timeframe_end_date` the there is a problem with your grouping. You always save the previous one so the last domain is not saved in redis. You can save the last one in the `end` handler. – HMR Jan 14 '18 at 05:37
  • Thanks for pointing that out! I will update that accordingly! – Miguel Coder Jan 14 '18 at 21:50