0

In a Spark Streaming application there is an external datasource(relational database) that I need to query every 10 minutes and make the results available for my stream processing pipeline.

I don't quite understand what is the right way of doing it. Accumulators are append only (as described in the documentation) but I found this:

  /**
   * Set the accumulator's value; only allowed on master
   */
  def setValue(newValue: R) {
    this.value = newValue
  }

and Broadcast variables are only write-once https://spark.apache.org/docs/1.6.2/programming-guide.html#broadcast-variables

The scheduling aspect is also not clear to me.

Is there a way to make updated result set available for the stream processing logic?

PS It seems to be very similar to what I need How can I update a broadcast variable in spark streaming?

Community
  • 1
  • 1
stanislav.chetvertkov
  • 1,620
  • 3
  • 13
  • 24
  • "I need to query every 10 minutes and make the results available for my stream processing pipeline" What do you mean here? There's a table that's being appended to and you need to read the most recent rows? A query you need to execute? What other data sources are in your stream processing pipeline? More information on what you're trying to accomplish here would help. – The Archetypal Paul Feb 08 '17 at 11:11
  • There is actually a service with API on top of the database, so I just query the service and it returns an up-to-date version of the data. I use Kafka direct consumer approach and need to 'enrich' the stream with most up-to-date version but querying the API every time(even if inside `foreachPartition` section) is super expensive. – stanislav.chetvertkov Feb 08 '17 at 11:43
  • 1
    Possible duplicate of [Updating a global variable periodically in Spark](http://stackoverflow.com/questions/33748528/updating-a-global-variable-periodically-in-spark) – Aastha Apr 18 '17 at 12:13

1 Answers1

0

I am doing the save in Java and it is working quite. There is similar answer here Updating a global variable periodically in Spark and also mentioned in question

public Broadcast<Map<String, List<String>>> updateBroadcastVariable(
  SparkContext sparkContext, DatabaseService databaseService) {
Date d = Calendar.getInstance().getTime();
long diff = d.getTime()-mysqlLastUpdatedAt.getTime();
if (updatedVar == null || diff > 60000) {
  if (var != null)
    updatedVar.unpersist();
  mysqlLastUpdatedAt = new Date(System.currentTimeMillis());
  Map<String, List<String>> map = new LinkedHashMap<>();

    List<String> dbData = databaseService.refreshData(JavaSparkContext.fromSparkContext(sparkContext));
  }
  updatedVar = JavaSparkContext.fromSparkContext(sparkContext).broadcast(dbData);
}
return updatedVar;
}
Community
  • 1
  • 1
Aastha
  • 493
  • 4
  • 17