3

In my opinion, Processing Time Temporal Join is used for a stream and an external database and always join the latest value in the external database based on the join condition. Also, Processing Time Temporal Join is used when the external table is not feasible to materialize the table as a dynamic table within Flink.

Similarly, Lookup Join is used for a stream and an external database, and always look up a value in the external database based on the join condition.

Will Lookup Join materialize the external database table in Flink? What't the difference between them?

David Anderson
  • 39,434
  • 4
  • 33
  • 60
Dilibaba
  • 123
  • 8

1 Answers1

6

A processing time temporal join is a join between two streams, while a lookup join is a join between a stream and an external database.

While Flink supports two types of event time temporal joins, one with the FOR SYSTEM_TIME AS OF syntax, and the other using temporal table functions, only the latter approach based on table functions is supported for processing time temporal joins.

A processing time temporal join works with two streams representing append-only dynamic tables -- e.g.,

SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency

When this temporal join is executed with a processing time attribute (as shown above), each incoming Order will be joined with the latest value from the Rates table/stream. The Orders table/stream will not be materialized at all, and the Rates table/stream will only retain the most recently consumed version of the Rate for each currency.

Unlike event time temporal joins, processing time temporal joins do not provide deterministic results.

By contrast, lookup joins execute queries against a lookup source, such as a JDBC database. By default, nothing is materialized in Flink, but some lookup sources (such as JDBC) offer optional caching.

These lookup joins also do not guarantee deterministic results, and instead execute the join with whatever data is available at the time the join is executed, with that data coming either from the cache or from a query.

Both temporal joins and lookup joins will NOT update their results. You just get a best-effort result based on what was known to the runtime at the time the join was executed.

Why does Flink bother offering processing time temporal joins? Why not just use a lookup join instead? Two reasons:

  • A lookup join is more expensive -- it's necessary to query the external database, and wait for a response. Yes, caching is a possibility, but then you're likely to be joining with obsolete data. With a processing time temporal join you are assured of using reasonably fresh data that is automatically updated.

  • Lookup joins require the implementation of a special connector. Temporal joins use the standard streaming connectors, so they are more universally available.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Document said: processing temporal join is useful when "the external table is not feasible to materialize the table as a dynamic table within Flink". As the external table is not a dynamic table, why don't we just use lookup join instead? – Dilibaba Feb 22 '22 at 02:10
  • 2
    Take a look at the update I just added to the end of my answer. As for the docs, I'm pretty sure the first part of that section in the docs is there by mistake -- it's a copy/paste of some sentences from the lookup join docs and doesn't entirely accurately reflect how processing time temporal joins work. – David Anderson Feb 22 '22 at 09:42
  • Great answer, thanks a lot! You mention that processing time temporal join work with append-only streams. Do you know if upsert streams are also supported (for the build side of the join?) – Robert Metzger Jun 28 '22 at 12:30
  • It seems like it should be possible to use an update stream on the build side of the join, but I don't know if that actually works. – David Anderson Jun 28 '22 at 13:14