7

I am currently using the code below to write parquet via Avro. This code writes it to a file system but I want to write to S3.

try {
    StopWatch sw = StopWatch.createStarted();
    Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
    final String parquetFile = "parquet/data.parquet";
    final Path path = new Path(parquetFile);

    ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
        .withSchema(avroSchema)
        .withConf(new org.apache.hadoop.conf.Configuration())
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
        .build();

    for (Map<String, Object> row : message.getTransformedMessage()) {
      StopWatch stopWatch = StopWatch.createStarted();
      final GenericRecord record = new GenericData.Record(avroSchema);
      row.forEach((k, v) -> {
        record.put(k, v);
      });
      writer.write(record);
    }
    //todo:  Write to S3.  We should probably write via the AWS objects.  This does not show that.
    //https://stackoverflow.com/questions/47355038/how-to-generate-parquet-file-using-pure-java-including-date-decimal-types-an
    writer.close();
    System.out.println("Total Time: " + sw);

  } catch (Exception e) {
    //do somethign here.  retryable?  non-retryable?  Wrap this excetion in one of these?
    transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
  }

This writes to a file fine, but how do I get it to stream it into the AmazonS3 api? I have found some code on the web using the Hadoop-aws jar, but that requires some Windows exe files to work and, of course, we want to avoid that. Currently I am using only:

 <dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-avro</artifactId>
  <version>1.8.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
</dependency>

So the question is, is there a way to intercept the output stream on the AvroParquetWriter so I can stream it to S3? The main reason I want to do this is for retries. S3 automagically retries up to 3 times. This would help us out a lot.

markthegrea
  • 3,731
  • 7
  • 55
  • 78
  • In [this](https://stackoverflow.com/questions/47355038/how-to-generate-parquet-file-using-pure-java-including-date-decimal-types-an) way maybe? Albeit it uses the libs/exes you would like to avoid :( :) – m4gic Mar 19 '20 at 08:10
  • Yes, that is close, but it has a dependency on Hadoop running on the server. – markthegrea Mar 19 '20 at 15:41
  • I am afraid this is not really feasible. If you have a look at the class [source](https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java) you can see that the superclass is org.apache.parquet.hadoop.ParquetWriter, so I would say (without digging too deep) that the existence of a configured hadoop is a requirement for this. It seems Spark can be a [better candidate for this](https://sparkbyexamples.com/spark/spark-read-write-parquet-file-from-amazon-s3/). – m4gic Mar 19 '20 at 19:50
  • Thanks for the response! I am really trying to overcome two things: the external dependency on hadoop and the S3 dependencies. If parquet is just a file format, why is it so hard to write out?! – markthegrea Mar 19 '20 at 20:15
  • 1
    You can find some answers to a similar question [here](https://stackoverflow.com/questions/29279865/parquet-without-hadoop) - have a look at the linked Jira issues. – m4gic Mar 20 '20 at 08:07

2 Answers2

8

This does depend on the hadoop-aws jar, so if you're not willing to use that I'm not sure I can help you. I am, however, running on a mac and do not have any windows exe files, so I'm not sure where you say those are coming from. The AvroParquetWriter already depends on Hadoop, so even if this extra dependency is unacceptable to you it may not be a big deal to others:

You can use an AvroParquetWriter to stream directly to S3 by passing it a Hadoop Path that is created with a URI parameter and setting the proper configs.

val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)

val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)

val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()

I used the following dependencies (sbt format):

"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"
john_s
  • 96
  • 4
  • 1
    This is the correct answer but needs to be fleshed out. Please include the jars you used. We did it this way and discovered that only Windows needs the extra .exe files and such. Linux (fargate on AWS) did not. These jars are updated and supported by AWS and the documentation is great. Nice work, noobie! – markthegrea Apr 15 '20 at 20:44
  • 1
    As far as I can tell, you don't need to include `avro` dependency explicitly. It comes with `parquet-avro`. – s.alem Sep 10 '20 at 21:16
  • I ended up also running on Windows, and needed a winutils.exe file in my HADOOP_HOME environment variable path. – john_s Sep 29 '20 at 18:03
  • 1
    This worked for me, might be obvious to others but I had to call ```writer.close()``` to actually complete the upload. – Müller Aug 26 '21 at 14:31
0

Hopefully I am not misunderstanding the question, but it seems here what you are doing is converting a avro to parquet and you'd like to upload the parquet to s3

After you close your ParquetWriter, you should call a method that looks like this (granted this doesn't intercept the stream writing from avro to parquet, it just streams the parquet file that is no longer being written to):

        AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("ACCESS_KEY", "SECRET_KEY"))).build();
        S3Path outputPath = new S3Path();
        outputPath.setBucket("YOUR_BUCKET");
        outputPath.setKey("YOUR_FOLDER_PATH");
        try {
            InputStream parquetStream = new FileInputStream(new File(parquetFile));
            s3Client.putObject(outputPath.getBucket(), outputPath.getKey(), parquetStream, null);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

using the AWS SDK

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.11.749</version>
</dependency>

Of course the method would reside in a different utils class and the constructor of this method should initialize the AmazonS3 s3Client with the credentials, so all you'd need to do is invoke and access it's s3Client member to put objects

hope this helps

  • 2
    Good try but this just shows how to upload a file to S3. Hadoop makes this difficult as the file is saved to disk first. I want to go DIRECTLY to s3 from ParquetWriter. – markthegrea Mar 24 '20 at 19:01
  • @markthegrea Hadoop uses HDFS which models POSIX file system behavior while S3 is a object store, not a file system- so to write directly to s3 you'd have to configure your clusters HDFS configurations to synchronize with S3- but again, your Hadoop job would still leverage the Hadoop API's HDFS implementation and you'd be writing on cluster disk space. If you use AWS EMR- this sync is set up already- and you can push objects from HDFS into s3. What would your use case be to do such an operation where writing to HDFS is being avoided? – Thecodeanator Mar 24 '20 at 19:40
  • We are merely converting data to parquet. As is seen above, the parquetWriter only writes to the local file system. We need to write to s3 without hadoop being involved. – markthegrea Mar 24 '20 at 20:26
  • @markthegrea You cannot write directly to S3 without some wrapper which is modifying the Hadoop API HDFS implementation to work with an object store (s3) by creating some sudo filesystem tree based on S3 paths (which m4gic provided a link for). The idea of Hadoop is to use HDFS because during map reduce, you'd be able to locally retrieve stored blocks and a shuffle is internal- where YARN can manage it. I guess, what I am trying to understand is why go through this effort to use a wrapper that will probably be harder to maintain in the long run anyways? Unless disk space is the concern? – Thecodeanator Mar 24 '20 at 21:13