I'm having a difficult time understanding the concepts of .withFileNamePolicy of TextIO.write(). The requirements for supplying a FileNamePolicy seem incredibly complex for doing something as simple as specifying a GCS bucket to write streamed filed.
At a high level, I have JSON messages being streamed to a PubSub topic, and I'd like to write those raw messages to files in GCS for permanent storage (I'll also be doing other processing on the messages). I initially started with this Pipeline, thinking it would be pretty simple:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply("Write to GCS", TextIO.write().to(gcs_bucket);
p.run();
}
I got the error about needing WindowedWrites, which I applied, and then needing a FileNamePolicy. This is where things get hairy.
I went to the Beam docs and checked out FilenamePolicy. It looks like I would need to extend this class which then also require extending other abstract classes to make this work. Unfortunately the documentation on Apache is a bit scant and I can't find any examples for Dataflow 2.0 doing this, except for The Wordcount Example, which even then uses implements these details in a helper class.
So I could probably make this work just by copying much of the WordCount example, but I'm trying to better understand the details of this. A few questions I have:
1) Is there any roadmap item to abstract a lot of this complexity? It seems like I should be able to do supply a GCS bucket like I would in a nonWindowedWrite, and then just supply a few basic options like the timing and file naming rule. I know writing streaming windowed data to files is more complex than just opening a file pointer (or object storage equivalent).
2) It looks like to make this work, I need to create a WindowedContext object which requires supplying a BoundedWindow abstract class, and PaneInfo Object Class, and then some shard info. The information available for these is pretty bare and I'm having a hard time knowing what is actually needed for all of these, especially given my simple use case. Are there any good examples available that implement these? In addition, it also looks like I need the set the # of shards as part of TextIO.write, but then also supply # shards as part of the fileNamePolicy?
Thanks for anything in helping me understand the details behind this, hoping to learn a few things!
Edit 7/20/17 So I finally got this pipeline to run with extending the FilenamePolicy. My challenge was needing to define the window of the streaming data from PubSub. Here is a pretty close representation of the code:
public class ReadData {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Write to GCS", TextIO.write().to("gcs_bucket")
.withWindowedWrites()
.withFilenamePolicy(new TestPolicy())
.withNumShards(10));
p.run();
}
}
class TestPolicy extends FileBasedSink.FilenamePolicy {
@Override
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String filename = String.format(
"%s-%s-%s-%s-of-%s.json",
"test",
window.start().toString(),
window.end().toString(),
context.getShardNumber(),
context.getShardNumber()
);
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}