2

I want to implement the following scenario using streaming pipeline in Apache Beam (and running it on Google DataFlow):

  1. Read messages from Pub/Sub (JSON strings)
  2. Deserialize JSONs
  3. Use custom field (say timeStamp) as a timestamp value for the processing element
  4. Apply fixed windowing of 60 seconds
  5. Extract key from elemtents and group by key
  6. << perform further processing >>

I've tried to resolve this problem using both Java(Scala) and Python, but non of the solution worked.

  1. Python solution
# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
        | beam.Map(add_timestamping)
        | beam.WindowInto(window.FixedWindows(60))
        | beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
        | beam.GroupByKey()
        # (...)
        | beam.io.WriteToPubSub("output_topic")
        )
p.run()

add_timestamping function as per documentation:

def add_timestamping(elem):
    import json
    import apache_beam as beam
    msg = json.loads(elem)
    unix_timestamp = msg['timeStamp'] / 1000
    return beam.window.TimestampedValue(msg, unix_timestamp)

Output of Python solution:

  1. When using DirectRunner, windows are emitted and the windowing itself is more-or-less appropriate, depending on the delay.
  2. When using DataFlowRunner, ALL messages are skipped with counter appearing in DataFlow UI: droppedDueToLateness.

  1. Java / Scala solution (I've used Scio but this happens to in clean Beam SDK in Java too)
sc.pubsubSubscription[String]("my_sub")
    .applyTransform(ParDo.of(new CustomTs()))
    .withFixedWindows(Duration.standardSeconds(60))
    .map(x => x) // exracting the key somehow, not relevant here
    .groupByKey
    // (...)
    .saveAsPubsub("output_topic")

Adding custom timestamp as per documentation:

import io.circe.parser._
class CustomTs extends DoFn[String, String] {
  @ProcessElement
  def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
    val json = parse(element).right.get
    val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
    out.outputWithTimestamp(element, new Instant(timestampMillis))
  }
}

Output of Java / Scala solution:

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalArgumentException:
 Cannot output with timestamp 2019-03-02T00:51:39.124Z. 
 Output timestamps must be no earlier than the timestamp of the current input
 (2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).

I cannot use DoFn.getAllowedTimestampSkew here as it's already deprecated and I don't know what ranges of historical data will be sent.


Having the ability to process historical data is crucial for my project (this data will be sent to Pub/Sub from some store). The pipeline must work both on current data as well as historical one.

My question is: How to process the data using custom timestamps with the ability to operate on windows defined using Beam API?

Marcin Zablocki
  • 10,171
  • 1
  • 37
  • 47

2 Answers2

1

If you have the ability to extract the timestamp at insertion point to PubSub, you will be able to make use of user-specified timestamps as metadata. The information on how to is documented under the 1.9 SDK.

https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids

"You can use user-specified timestamps for precise control over how elements read from Cloud Pub/Sub are assigned to windows in a Dataflow pipeline. "

As 1.9 is deprecated, in 2.11 you will need https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-

Reza Rokni
  • 1,206
  • 7
  • 12
0

One solution is to custom the timestamp of PubSub message through withAllowedTimestampSkew of WithTimestamps

 static SerializableFunction<Event, Instant> timestampFn = (Event input) -> new Instant(Objects.requireNonNull(input).getTs());

...

 PCollection<String> rawMessage = pipeline.apply("Read PubSub Message",
                        PubsubIO.readStrings().fromTopic(topic)); 
 PCollection<Event> events = rawMessage.apply("Convert to UserEvent",
                MapElements.into(TypeDescriptor.of(Event.class))
                        .via(Event::fromJSON))
                .apply("With Timestamp", WithTimestamps.of(timestampFn).withAllowedTimestampSkew(Duration.standardMinutes(2)));

 PCollection<Event> minute_window_events = events.apply("Minute Window",
                Window.<Event>into(FixedWindows.of(Duration.standardMinutes(3)))
                        .triggering(AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(Duration.standardMinutes(1)))
                        .discardingFiredPanes()
                        .withAllowedLateness(Duration.standardMinutes(1))
        );
...

However, this method withAllowedTimestampSkew will be deprecated per doc

@deprecated This method permits a to elements to be emitted behind the watermark. These elements are considered late, and if behind the {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may be silently dropped. See https://github.com/apache/beam/issues/18065 for details on a replacement.

You could refer to https://github.com/apache/beam/issues/18065 for details on a replacement.


Another solution is withTimestampAttribute. You can set an attribute in pubsub messages that contains the timestamp field.

When publishing a message to PubSub

p.apply(PubsubIO.writeString().withTimestampAttribute("timestamp").to(topic));

And when Subscribing:

p.apply(PubsubIO.readString().fromTopic(topic).withTimestampAttribute("timestamp"))
zangw
  • 43,869
  • 19
  • 177
  • 214