0

I'm working with a dataflow pipeline that a coworker recently migrated to version 2.2.0. The relevant step in the pipeline, which is throwing an error, is the following:

domainOutputBucket = "gs://output/partner/20180311/raw/DomainBatch20180311_"

output.get(domainsOut)
.setCoder(StringUtf8Coder.of())
.apply("WriteDomain" + description, TextIO.write()
.to(domainOutputBucket).withSuffix(".csv")                        // <-- line 109
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
.withNumShards(numChunksCustom));

When this code gets compiled however, the following error and stacktrace appear:

Exception in thread "main" java.nio.file.InvalidPathException: Illegal char <:> at index 2: gs://output/partner/20180311/raw/DomainBatch20180311_
 at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182)
 at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153)
 at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77)
 at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94)
 at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255)
 at java.nio.file.Paths.get(Paths.java:84)
 at org.apache.beam.sdk.io.LocalFileSystem.matchNewResource(LocalFileSystem.java:196)
 at org.apache.beam.sdk.io.LocalFileSystem.matchNewResource(LocalFileSystem.java:78)
 at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:544)
 at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:213)
 at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:679)
 at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:997)
 at com.package.output.Partner.partnerPipeline(Partner.java:109)
 at com.package.output.Output.Export(Output.java:285)
 at com.package.output.Output.main(Output.java:254)

Based on this information, does anyone see what the issue might be with the code I included above? If I find the answer on my own before anyone else comments, I'll be sure to update this question for future devs.

Max
  • 808
  • 11
  • 25
  • Please check out the comemnts of this question: https://stackoverflow.com/questions/49244752/google-cloud-dataflow-specifying-templocation-via-command-line-argument – Lara Schmidt Mar 14 '18 at 22:04
  • @LaraSchmidt I'm somewhat confused on where to include 'gcpTempLocation'. Is that a GCS function, or a variable I am supposed to pass in as a key in a key-value pair to the pipeline? – Max Mar 14 '18 at 22:39
  • Sorry, I meant the comments after that. You need to ensure that the storage class is on your class path. Otherwise the system does not know how to interpret GCP filenames and attempts to find it as a local file system. – Lara Schmidt Mar 14 '18 at 23:37
  • @LaraSchmidt can you add your response as an answer instead of a comment please? – Lefteris S Mar 17 '18 at 10:19

1 Answers1

1

Take a look at the comemnts on Google Cloud Dataflow: Specifying TempLocation via Command Line Argument (like after number 6).

You need to ensure that the storage class is on your class path. Otherwise the system does not know how to interpret GCP filenames and attempts to find it as a local file system.

Lara Schmidt
  • 309
  • 2
  • 6