2

(new information below) I am trying to set up a lambda function that reacts to uploaded tgz files by uncompressing them and writing the results back to S3. The unzip and untar work fine, but uploading to S3 fails:

/Users/russell/lambda/gzip/node_modules/aws-sdk/lib/s3/managed_upload.js:350
    var buf = self.body.read(self.partSize - self.partBuffer.length) ||
                        ^
TypeError: undefined is not a function
    at ManagedUpload.fillStream (/Users/russell/lambda/gzip/node_modules/aws-sdk/lib/s3/managed_upload.js:350:25)
    at Entry.<anonymous> (/Users/russell/lambda/gzip/node_modules/aws-sdk/lib/s3/managed_upload.js:167:28)
    at Entry.emit (events.js:104:17)
    at Entry._read (/Users/russell/lambda/gzip/node_modules/tar/lib/entry.js:123:12)
    at Entry.end (/Users/russell/lambda/gzip/node_modules/tar/lib/entry.js:82:8)
    at Parse._process (/Users/russell/lambda/gzip/node_modules/tar/lib/parse.js:107:13)
    at BlockStream.<anonymous> (/Users/russell/lambda/gzip/node_modules/tar/lib/parse.js:47:8)
    at BlockStream.emit (events.js:107:17)
    at BlockStream._emitChunk (/Users/russell/lambda/gzip/node_modules/tar/node_modules/block-stream/block-stream.js:145:10)
    at BlockStream.write (/Users/russell/lambda/gzip/node_modules/tar/node_modules/block-stream/block-stream.js:45:10)

This error occurs when I write to S3, but if instead I write the files locally to disk it works, so the pipeline is correct.

Here is code that demonstrates the problem:

var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});
var zlib = require('zlib');
var tar = require('tar');
var fstream = require('fstream');

fstream.Reader({'path': 'testdata.tar.gz'})
    .pipe(zlib.Unzip())
    .pipe(tar.Parse())
    .on('entry', function(entry) {
        var filename = entry.path;
        console.log('got ' + entry.type + ' ' + filename);
        if (entry.type == 'File') {
            if (1) { // switch between working and nonworking cases
                s3.upload({Bucket: 'my_bucket', Key: 'gunzip-test/' + filename, Body: entry}, {},
                          function(err, data) {
                              if (err) 
                                  console.log('ERROR!');
                              else
                                  console.log('OK');
                          });
            }
            else {
                entry.pipe(fstream.Writer({ 'path': '/tmp/mytest/' + filename }));
            }
        }
    });

If the code is set to write to S3 it fails with the above error, if it writes the extracted files locally it succeeds. ENTRY is a stream, and according to the doc should be accepted in the upload Body parameter. I put a print statement in ManagedUpload, where the fail comes, and confirmed that self.body is a stream:

var stream = require('stream');
console.log('is it a stream? ' + ((self.body instanceof stream) ? 'yes' : 'no'));
console.log('self.body.read is ' + self.body.read);

returns

$ got File gunzip.js
is it a stream? yes
self.body.read is undefined

I'm pretty new with aws and node.js, so there could be a basic problem with this, but I've spent a day and haven't found it. I did the upload call with unzip instead of gzip and it worked (using lambda functions to unzip archives in S3 is really sloooooow) Can anyone point me at something I am doing wrong in this code?

Thanks


I think I understand this a little better. I broke the pipeline up into pieces and looked at each one. The problem is that tar.Parse uses fstream and not stream. If I look at the return of the .pipe(tar.Parse()) statement it is a stream, but it is not a stream.Readable or a stream.Writable. fstream does not define a read() method (its reader is based on Stream, it is not a stream.Readable), so tar.Parse, which is based on Stream, does not have one either.

So a refinement of the question is, is this a bug in fstream, or is fstream not intended to be a stream? I think it is a bug - from the README:

"Like FS streams, but with stat on them, and supporting directories and symbolic links, as well as normal files. Also, you can use this to set the stats on a file, even if you don't change its contents, or to create a symlink, etc."

Community
  • 1
  • 1
russell
  • 660
  • 1
  • 10
  • 18

2 Answers2

15

In my case running the stream through stream.PassThrough helped.

var PassThrough = require('stream').PassThrough;

var stream = getStreamSomeHow();
var passthrough = new PassThrough();

stream.pipe(passthrough);

s3.upload({...,Body:passthrough}) // 
JakubKnejzlik
  • 6,363
  • 3
  • 40
  • 41
  • Hey, this worked for me too! Any idea why this is needed though? – Shrikrishna Holla Jun 09 '17 at 05:16
  • I'm not sure, but if I'm not mistaken the reason was that the S3 upload expected some method calls which failed when operating directly on the stream....by piping it through the PassThrough it's methods which have different implementation are called and (luckily) it works :) – JakubKnejzlik Jun 13 '17 at 16:37
-1

Your body variable is a Stream object, in which case you will need to use .toString()

var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});
var zlib = require('zlib');
var tar = require('tar');
var fstream = require('fstream');

fstream.Reader({'path': 'testdata.tar.gz'})
    .pipe(zlib.Unzip())
    .pipe(tar.Parse())
    .on('entry', function(entry) {
        var filename = entry.path;
        console.log('got ' + entry.type + ' ' + filename);
        if (entry.type == 'File') {
            if (1) { // switch between working and nonworking cases
                s3.upload({Bucket: 'my_bucket', Key: 'gunzip-test/' + filename, Body: entry.toString()}, {},
                          function(err, data) {
                              if (err) 
                                  console.log('ERROR!');
                              else
                                  console.log('OK');
                          });
            }
            else {
                entry.pipe(fstream.Writer({ 'path': '/tmp/mytest/' + filename }));
            }
        }
    });
Shimon Tolts
  • 1,602
  • 14
  • 15
  • The api doc for putObject() says that Body is "Body — (Buffer, Typed Array, Blob, String, ReadableStream) Object data." Doesn't this mean that it accepts any of those data types as source? I'm assuming changing this to string would be expensive and unnecessary, that streaming would be faster. Is this wrong? – russell Feb 24 '15 at 14:44
  • Also, in the (working) code for the linked question, where I used unzip instead of tar and zlib, the files were uploaded the same way, and I'm pretty sure that passed a stream too. – russell Feb 24 '15 at 14:55