0

I would like to iterate over a CSV file and use puppeteer to screenshot a URL for each row in the CSV file.

I have the following code, which works fine, but each request waits for the previous one to complete, so it takes ages to run:

const csv = require('csv-parser');
const fs = require('fs');
const puppeteer = require('puppeteer');

(async () => {
    const browser = await puppeteer.launch();

    const getFile = async function(rowId, path) {
        const page = await browser.newPage();
        page.setViewport({ width: 1000, height: 1500, deviceScaleFactor: 1 });
        let url = 'https://www.facebook.com/ads/library/?id=' + rowId;
        const response = await page.goto(url, { waitUntil: 'networkidle2' });
        await page.waitFor(3000);
        const body = await page.$('body');
        await body.screenshot({
            path: path
        });
        page.close();
    };

    let fname = 'ids.csv'
    const csvPipe = fs.createReadStream(fname).pipe(csv());
    csvPipe.on('data', async (row) => {
            let id = row.ad_id;
            console.log(id);
            let path = './images/' + id + '.png';
            csvPipe.pause();
            await getFile(id, path);
            csvPipe.resume();
        }).on('end', () => {
            console.log('CSV file successfully processed');
        });
})();

How can I make the requests run in parallel, in order to speed it up?

If I remove the pause() and resume() lines then I get this error each time the function runs:

(node:18610) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 14)
(node:18610) UnhandledPromiseRejectionWarning: TypeError: Cannot read property 'screenshot' of null
    at getFile (/Users/me/Dropbox/Projects/scrape/index.js:29:12)
    at <anonymous>
    at process._tickCallback (internal/process/next_tick.js:189:7)
Richard
  • 62,943
  • 126
  • 334
  • 542
  • Well, this code is already asynchronous. Other things could run while this is running because it pauses for lots of asynchronous operations. But, all your operations are sequenced so nothing can run in parallel. You could not `await getFile()` so that each of those could run in parallel. And, you could get rid of the `page.waitFor(3000);` which looks like a hack. I don't know that specific page, but you should be able to find a more definitive way of knowing when that page is done rather than just blindly waiting 3 seconds. Also, probably want to do `page.close()` when done with the page. – jfriend00 Jan 30 '20 at 07:54
  • Sorry - I'll amend the question. I mean that I would like the requests to run in parallel. – Richard Jan 30 '20 at 09:35

3 Answers3

1

Here's a scheme that runs a user controllable number of getFile() operations in parallel. You set the maxInFlight variable to how many pages you want to run in parallel (which is probably just a matter of your memory usage or any rate limiting that facebook might apply). You will have to decide what to set that to with experimentation. I've set it initially to 10 to allow 10 pages to be "in flight" at the same time.

The general idea here is that getFile() increments/decrements inFlightCntr as a measure of how many pages are open at once and then the csvPipe is paused or resumed based on that counter.

const csv = require('csv-parser');
const fs = require('fs');
const puppeteer = require('puppeteer');

(async () => {
    const browser = await puppeteer.launch();

    const maxInFlight = 10;     // set this value to control how many pages run in parallel
    let inFlightCntr = 0;
    let paused = false;

    async function getFile(rowId, path) {
        try {
            ++inFlightCntr;
            const page = await browser.newPage();
            page.setViewport({ width: 1000, height: 1500, deviceScaleFactor: 1 });
            let url = 'https://www.facebook.com/ads/library/?id=' + rowId;
            const response = await page.goto(url, { waitUntil: 'networkidle2' });
            await page.waitFor(3000);
            const body = await page.$('body');
            await body.screenshot({
                path: path
            });
            await page.close();
        } catch(e) {
            console.log(e);
            page.close();
        } finally {
            --inFlightCntr;
        }
    }

    let fname = 'ids.csv'
    const csvPipe = fs.createReadStream(fname).pipe(csv());
    csvPipe.on('data', async (row) => {
            let id = row.ad_id;
            console.log(id);
            let path = './images/' + id + '.png';
            getFile(id, path).finally(() => {
                if (paused && inFlightCntr < maxInFlight) {
                    cvsPipe.resume();
                    paused = false;
                }
            });
            if (!paused && inFlightCntr >= maxInFlight) {
                cvsPipe.pause();
                paused = true;
            }
        }).on('end', () => {
            console.log('CSV file successfully processed');
        });
})();

The code could be a bit simpler if you just ran the csvPipe to collect all the rows into an array (before you process any of them). Then, you could use any number of promise concurrency functions for processing the array while controlling how many run in parallel. See this answer from yesterday for a number of functions that let you manage concurrency when parallel processing an array. Here's how that implementation would look:

const csv = require('csv-parser');
const fs = require('fs');
const puppeteer = require('puppeteer');

(async () => {
    const browser = await puppeteer.launch();

    const maxInFlight = 10;     // set this value to control how many pages run in parallel
    const fname = 'ids.csv'
    const csvPipe = fs.createReadStream(fname).pipe(csv());
    const rowIDs = [];

    async function getFile(rowId, path) {
        try {
            const page = await browser.newPage();
            page.setViewport({ width: 1000, height: 1500, deviceScaleFactor: 1 });
            let url = 'https://www.facebook.com/ads/library/?id=' + rowId;
            const response = await page.goto(url, { waitUntil: 'networkidle2' });
            await page.waitFor(3000);
            const body = await page.$('body');
            await body.screenshot({
                path: path
            });
        } catch(e) {
            console.log(e);
        } finally {
            await page.close();
        }
    }

    csvPipe.on('data', row => {
        rowIDs.push(row.ad_id);
    }).on('end', () => {
        // all rowIDs in the array now
        pMap(rowIDs, (id) => {
            let path = './images/' + id + '.png';
            return getFile(id, path);
        }, maxInFlight).then(() => {
             console.log("all items processed");     // all done now
        }).catch(err => {
             console.log(e);
        });
    });
})();


// utility function for processing an array asynchronously with 
// no more than limit items "in flight" at the same time
function pMap(array, fn, limit) {
    return new Promise(function(resolve, reject) {
        var index = 0, cnt = 0, stop = false, results = new Array(array.length);

        function run() {
            while (!stop && index < array.length && cnt < limit) {
                (function(i) {
                    ++cnt;
                    ++index;
                    fn(array[i]).then(function(data) {
                        results[i] = data;
                        --cnt;
                        // see if we are done or should run more requests
                        if (cnt === 0 && index === array.length) {
                            resolve(results);
                        } else {
                            run();
                        }
                    }, function(err) {
                        // set stop flag so no more requests will be sent
                        stop = true;
                        --cnt;
                        reject(err);
                    });
                })(index);
            }
        }
        run();
    });
}   
jfriend00
  • 683,504
  • 96
  • 985
  • 979
1

If you are fine with using another library, you can give puppeteer-cluster a try (disclaimer: I'm the author). It solves exactly that problem.

You queue the jobs and let the library take care of the concurrency:

const cluster = await Cluster.launch({
    concurrency: Cluster.CONCURRENCY_PAGE, // you could also use something different (see docs)
    maxConcurrency: 4, // how many pages in parallel your system can handle
});

// setup your task
await cluster.task(async ({ page, data: { rowId, path } }) => {
    await page.goto(url);
    // ... remaining code
});

// just read everything at once and queue all jobs
let fname = 'ids.csv';
fs.createReadStream(fname).pipe(csv()).on('data',
    (row) => cluster.queue({ id: row.ad_id, path: './images/' + row.ad_id + '.png' })
);

// wait until all jobs are done and close the cluster
await cluster.idle();
await cluster.close();

This code sets up a cluster with 4 workers (4 browser pages) and works on the queued jobs ({ id: ..., path: ... }).

Thomas Dondorf
  • 23,416
  • 6
  • 84
  • 105
0

You can achieve this with Promise.all()
Step 1: you have to create your pages ready to be used:
const pages = await Promise.all([browser.newPage(),browser.newPage()])
Step 2: you can parse your csv file to generate blocks of urls according to amount of pages you created on step 1.
You don't load data, just parsing csv and getting results.
Generate results array, which should look like this: const rows = [url1, url2, ..etc] Than, transform this to blocks according to amount of initialized pages you have.
Something like this:

const rowPacks = rows.reduce((acc, cur) => {
  if(!acc.length || acc[acc.length - 1].length < pages.length){
    acc.push([cur]);
    return acc;
  }

  acc[acc.length - 1].push(cur);
  return acc;
}, []);


Step 3: use generated pages to handle your data, something like this:

const pageDataLoad = async (page, url) => {
   await page.goto(url, { waitUntil: 'networkidle2' });
    await page.waitFor(3000);
    const body = await page.$('body');
    await body.screenshot({
        path: path
    });
} 

while(rowPacks.length){
  const packToUse = rowPacks.splice(0, 1);

  const passedRowsToPages = pages.map((p, pageIndex) => pageDataLoad(p, packToUse[pageIndex]));

  await Promise.all(passedRowsToPages);
}

Just play around with timeouts and amount of page instances to prevent DDOS attack on target urls and to prevent memory extension problems.

Grynets
  • 2,477
  • 1
  • 17
  • 41