Posting what I figured out in case someone else comes across this.
There were three issues with how I was attempting to use writeDynamic()
before.
- Previously I had been using Beam version 2.3.0, which does indeed describe
FileNaming
as a class internal to FileIO.Write
. Beam 2.4.0 defines FileNaming
as a public static interface
making it available externally.
- Fully resolving/importing
defaultNaming
. Rather than calling defaultNaming
directly - as it is called in the example documentation - it must be invoked as FileIO.Write.defaultNaming
since FileIO
is the package I actually imported.
- Adding
withDestinationCoder
was also required to perform the dynamic write.
The final solution ended up looking like this.
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}
Where Event::getKey
is a static function defined within the same package with the signature public static String getKey(String event)
.
This performs a windowed write which will write one file per window (as defined by the .withNumShards(1)
method). This assumes the window has been defined in a previous step. A GroupByKey
is not required prior to writing since it is done in the process of writing whenever the number of shards is defined explicitly. See the FileIO documentation for more details under "Writing files -> How many shards are generated per pane".