3

I have a simple job that moves data from pub sub to gcs. The pub sub topic is a shared topic with many different message types of varying size

I want the result to be in GCS vertically partition accordingly:

Schema/version/year/month/day/

under that parent key should be a group of files for that day, and the files should be a reasonable size, ie 10-200 mb

Im using scio and i am able to a groupby operation to make a P/SCollection of [String, Iterable[Event]] where the key is based on the partitioning scheme above.

I am unable to use the default text sink since they do not support vertical partitioning, it can only write the entire pcollection to one location. Instead following the advice in the following answers:

How do I write to multiple files in Apache Beam?

Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn

I have created a simple function that writes my group to gcs.

object GcsWriter {

  private val gcs: storage.Storage = StorageOptions.getDefaultInstance.getService

  val EXTENSION = ".jsonl.gz"

  //todo no idea if this is ok. see org.apache.beam.sdk.io.WriteFiles is a ptransform that writes text files and seems very complex
  //maybe beam is aimed at a different use case
  //this is an output 'transform' that writes text files
  //org.apache.beam.sdk.io.TextIO.write().to("output")


  def gzip(bytes: Array[Byte]): Array[Byte] = {
    val byteOutputStream = new ByteArrayOutputStream()
    val compressedStream = new GZIPOutputStream(byteOutputStream)
    compressedStream.write(bytes)
    compressedStream.close()
    byteOutputStream.toByteArray
  }

  def writeAsTextToGcs(bucketName: String, key: String, items: Iterable[String]): Unit = {
    val bytes = items.mkString(start = "",sep ="\n" ,end = "\n").getBytes("UTF-8")
    val compressed = gzip(bytes)
    val blobInfo = BlobInfo.newBuilder(bucketName, key + System.currentTimeMillis() + EXTENSION).build()
    gcs.create(blobInfo, compressed)
  }

}

This works and writes the files how i like i use the following triggering rules with fixed windows:

val WINDOW_DURATION: Duration = Duration.standardMinutes(10)
  val WINDOW_ELEMENT_MAX_COUNT = 5000
  val LATE_FIRING_DELAY: Duration = Duration.standardMinutes(10) //this is the time after receiving late data to refiring
  val ALLOWED_LATENESS: Duration = Duration.standardHours(1)


  val WINDOW_OPTIONS = WindowOptions(
    trigger = AfterFirst.of(
      ListBuffer(
        AfterPane.elementCountAtLeast(WINDOW_ELEMENT_MAX_COUNT),
        AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_FIRING_DELAY)))),
    allowedLateness = ALLOWED_LATENESS,
    accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
  )

Basically a compound trigger of at the end of the window according to the watermark or when x elements are received.

The problem is this that the source data can have messages of varying size. So if i choose a fixed number of elements to trigger on i will either:

1) choose a too big number, for the larger events groups it will blow up the java heap on the worker 2) choose a smaller number, then i end up with some tiny files for the quiet events where i would want to accumulate more events in my file.

I dont see a custom trigger where i can pass a lambda which outputs the metric on each element or something like that. Is there a way i can implement my own trigger to trigger on the number of bytes in the window.

Some other questions

Am i correct in assuming the Iterator for the elements in each group is in memory not streamed from storage? If not i could stream from the iterator to gcs in a more memory efficient way

For my GCS writer i am simply doing it in a map or a ParDo. It doesn't not implement the File output sink or look anything like TextIo. Is there going to be issues with this simple implementation. in the docs it says that if a transform throws an exception it is simply retried (indefinately for streaming apps)

Luke De Feo
  • 2,025
  • 3
  • 22
  • 40
  • TextIO supports writing different values to different filepatterns - see the DynamicDestinations API (write().to(DynamicDestinations)) available I think since Beam 2.1. Does it do what you want? – jkff Sep 26 '17 at 16:25
  • Hi i checked and its not in the current 2.1.0, but i do see something on master on github. Perhaps it will be in the next release. Either way hopefully it will replace the need for my dodgy output writer function :) – Luke De Feo Sep 27 '17 at 09:48
  • @jkff do you have any comment on the byte based triggering? – Luke De Feo Sep 27 '17 at 09:48
  • I don't think triggering is the right tool for the job here. Triggering controls when a GroupByKey emits the values for a window of a particular key, and your use case does not involve grouping by a key. You should probably use a simple DoFn and add some batching based on number of bytes, e.g. see how it's done in ElasticsearchIO.write https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L773 – jkff Sep 27 '17 at 17:31
  • Apologies if my question wasn't clear but I am doing a group by in order to vertically partition the data before I write to gcs. The key is the schema/year/month/day I want to be able to control the physical size of the groupings though with triggers – Luke De Feo Oct 01 '17 at 10:09
  • I believe my suggestion to mimic ElasticsearchIO.write() still applies and is still a far better tool for the job than triggers. – jkff Oct 02 '17 at 18:20
  • Hi the elastic search is batching the group into chunks that are the smaller than the grouping. The issue is my groupings are pretty large and unless i limit them somehow will blow up the heap on the workers which is i need the element count / byte based trigger, can you elaborate why triggers are not suitable for this – Luke De Feo Oct 11 '17 at 10:10
  • @LukeDeFeo were you able to find a way to create window by byte count? – Sumit Jun 08 '20 at 04:06

0 Answers0