3

I'm using Apache Beam 2.28.0 on Google Cloud DataFlow (with Scio SDK). I have a large input PCollection (bounded) and I want to limit / sample it to a fixed number of elements, but I want to start the downstream processing as soon as possible.

Currently, when my input PCollection has e.g. 20M elements and I want to limit it to 1M by using https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/transforms/Sample.html#any-long-

input.apply(Sample.<String>any(1000000))

it waits until all of the 20M elements are read, which takes a long time.

How to efficiently limit number of elements to a fixed size and start downstream processing as soon as the limit is reached, discarding the rest of the input processing?

Marcin Zablocki
  • 10,171
  • 1
  • 37
  • 47

1 Answers1

2

OK, so my initial solution for that is to use Stateful DoFn like this (I'm using Scio's Scala SDK as mentioned in the question):

import java.lang.{Long => JLong}

class MyLimitFn[T](limit: Long) extends DoFn[KV[String, T], KV[String, T]] {
  @StateId("count") private val count = StateSpecs.value[JLong]()

  @ProcessElement
  def processElement(context: DoFn[KV[String, T], KV[String, T]]#ProcessContext, @StateId("count") count: ValueState[JLong]): Unit = {
    val current = count.read()
    if(current < limit) {
      count.write(current + 1L)
      context.output(context.element())
    }
  }
}

The downside of this solution is that I need to synthetically add the same key (e.g. an empty string) to all elements before using it. So far, it's much faster than Sample.<>any().

I still look forward to see better / more efficient solutions.

Marcin Zablocki
  • 10,171
  • 1
  • 37
  • 47
  • 1
    I will confirm that this is likely the best approach for your use case. The original implementation is less efficient because it doesn't use stateful processing and is forced to use a CombineFn instead. That is also the reason that it requires all elements to be received before outputting data: The fused stage ends so elements can be aggregated for the combine, and in batch pipelines that requires all data to be present. – Daniel Oliveira Jun 09 '21 at 01:15