4

Using SCIO from spotify to write a job for Dataflow , following 2 examples e.g1 and e.g2 to write a PubSub stream to GCS, but get the following error for the below code

Error

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 

Code

object StreamingPubSub {
  def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()

val sc = ScioContext(opts)


sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
    _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
  }
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))


val result = sc.close()

// CTRL-C to cancel the streaming pipeline
    dataflowUtils.waitToFinish(result.internal)
  }
}

I maybe mixing up the window concept with Bounded PCollection, is there a way to achieve this or do I need to apply some transform to make this happen, anyone can be of assistance on this

rav
  • 3,579
  • 1
  • 18
  • 18
DAR
  • 106
  • 1
  • 10

1 Answers1

3

I believe SCIO's saveAsTextFile underneath uses Dataflow's Write transformation, which supports bounded PCollections only. Dataflow doesn't provide a direct API to write an unbounded PCollection to Google Cloud Storage yet, although this is something we are investigating.

To persist an unbounded PCollection somewhere, consider, for example, BigQuery, Datastore, or Bigtable. In SCIO's API, you could use, for example, saveAsBigQuery.

Davor Bonaci
  • 1,709
  • 8
  • 9