2

I am following along with answer to this post and the documentation in order to perform a dynamic windowed write on my data at the end of a pipeline. Here is what I have so far:

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(
        FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://some_bucket/events/")
            .withNaming(key -> defaultNaming(key, ".json")));
}

But NetBeans warns me about a syntax error on the last line:

FileNaming is not public in Write; cannot be accessed outside package

How do I make defaultNaming available to my pipeline so that I can use it for dynamic writes. Or, if that isn't possible, what should I be doing instead?

ljhennessy
  • 136
  • 1
  • 13
  • This does not look like a beam/dataflow issue but rather like a java one. [This](https://stackoverflow.com/a/8487678/9251751) answer in an older question offers an explanation on why java throws this type of error and [this one](https://stackoverflow.com/a/8386798/9251751) a possible solution. Does any of them help? – Lefteris S May 08 '18 at 14:18
  • I agree it is a Java path issue. However, I'm looking for some help in the Beam context in using that `defaultNaming` method. The post I refer to shows the use of this method similar to how I am using it, and yet apparently it does not throw the same error. I am wondering why not. – ljhennessy May 08 '18 at 16:04

1 Answers1

5

Posting what I figured out in case someone else comes across this.

There were three issues with how I was attempting to use writeDynamic() before.

  1. Previously I had been using Beam version 2.3.0, which does indeed describe FileNaming as a class internal to FileIO.Write. Beam 2.4.0 defines FileNaming as a public static interface making it available externally.
  2. Fully resolving/importing defaultNaming. Rather than calling defaultNaming directly - as it is called in the example documentation - it must be invoked as FileIO.Write.defaultNaming since FileIO is the package I actually imported.
  3. Adding withDestinationCoder was also required to perform the dynamic write.

The final solution ended up looking like this.

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(FileIO.<String, String>writeDynamic()
                .by(Event::getKey)
                .via(TextIO.sink())
                .to("gs://some_bucket/events/")
                .withDestinationCoder(StringUtf8Coder.of())
                .withNumShards(1)
                .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}

Where Event::getKey is a static function defined within the same package with the signature public static String getKey(String event).

This performs a windowed write which will write one file per window (as defined by the .withNumShards(1) method). This assumes the window has been defined in a previous step. A GroupByKey is not required prior to writing since it is done in the process of writing whenever the number of shards is defined explicitly. See the FileIO documentation for more details under "Writing files -> How many shards are generated per pane".

ljhennessy
  • 136
  • 1
  • 13