3

I have a Spark program that starts to create the network of France (cities, local authorities...) in a dataset for a given year. It is then used for other operations : local accounting, searching among enterprises, etc.

The dataset is in term of business rules rather hard to create : many filters, checking of many kinds, and I don't know in advance how the caller who ask for it will use it. But most of the times, he asks for a dataset for the year 2019, because he only needs "All the cities existing in France today.".

My program below succeed in returning results for 2019. Next caller also calls for cities of 2019 : Spark restarts against the whole work he did before...

What is the principle of optimization here ?

Shall I store in my program, at the same level where I store the spark session I use for requests and building, something like a Map<Integer, Dataset> where the key is the year, and the dataset the one that at least one caller has asked for this year ?

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Marc Le Bihan
  • 2,308
  • 2
  • 23
  • 41
  • IGNITE? Good point. But you get skipped stages in some cases. See https://stackoverflow.com/questions/58378727/in-which-situations-are-the-stages-of-dag-skipped/58391941#58391941. That mitigates if running from shell or in the spark-submit. – thebluephantom Oct 22 '19 at 07:08
  • You could manually run your spark program once a day (this data does not seem to change every day) or twice a day and in the end write to a sql db. Have a seperate Java web project that reads and caches in redis or in RAM if total data is not more than 2-3 GB (as RAM will be fastest - a static hash map or other static list). And refresh the data every x hours from the data base. and ahe another API to know the modified date of the data so the apps that use your API know if the backend process is running as often as expected – tgkprog Oct 22 '19 at 07:26

3 Answers3

1

You would have to save the dataset to hdfs or any other store being used and load it whenever required instead of recomputing the entire dataset again . This is more about how you would design your application . Probably these datasets should be precomputed for certain recent years as part of data preparation and be ready to use always. This is assuming the next time when it runs it is triggered as a new job ex: job running once a day

Shridhar
  • 56
  • 5
  • I see. Store on a flat table the result of the dataset and when asked again for : reconstitute the dataset from this table. Like doing a second level of `persist()`, in fact. But, yes. This might work. – Marc Le Bihan Oct 22 '19 at 16:07
  • A simple `ds.write().parquet(store);` one way, `Dataset ds = session.read().parquet(store);` the other way succeed in storing/loading previous Datasets, provided I choose a store name for the file that references the arguments used for the call : `cities_2019`, for example. Thanks ! – Marc Le Bihan Oct 28 '19 at 03:01
0

Assuming a spark-shell or spark-compiled program that runs in same Session picking up requests:

  1. Use IGNITE, or
  2. Rely on 'skipped stages' effect (using .cache for DFs as well).

Latter, by example, against RDDs but DF have these underlying:

val d = sc.parallelize(0 until 100000).map(i => (i%10000, i)).cache // or not cached, does not matter for RDD, for DF, DS it does

val c=d.rightOuterJoin(d.reduceByKey(_+_))
val f=d.leftOuterJoin(d.reduceByKey(_+_))

c.count
c.collect // skipped, shuffled 
f.count
f.collect // skipped, shuffled

val g = f.filter(e => e._1%2==0) 
val h = f.filter(e => e._1==657)
val j = f.filter(e => e._1==1657)

g.collect 
h.collect 
j.collect  // these skipped as well

Trivial example, but you see Spark shuffling meaning some aspects need not be done again, but it depends on your use cases and how you read the data initially is my take.

Note skipped stages from Spark UI, thus not always as bad as one may think. In some cases your "caching" is achieved this way.

For actions that need different processing, then at least the underlying (intermediate) sources need .cache or .persist.

enter image description here

If new spark-submit used:

  1. use IGNITE, or
  2. Re-use checkpoint directory although very convoluted, see Spark Checkpointing Non-Streaming - Checkpoint files can be used in subsequent job run or driver program, although convoluted, and only really applicable if multiple Actions possible on that shuffled RDD that is pre-read, else effect not so great. Or
  3. Use a good initial query and do a bucketBy save and re-read. See https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4861715144695760/2994977456373837/5701837197372837/latest.html. Particularly handy in case of sorting.
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
0

Redis is the best choice to use with spark. Store the results into the Redis and for the next request just take from Redis.

Sandhya
  • 98
  • 10
  • The next request will have a result that won't be a `dataset` anymore. When a caller ask for enterprises of some kind existing in France in 2019, an underlying call prepares my dataset of cities linked to local authorities and so on. What is returned from the "cache" must be a dataset, not a response because the caller can be `Spark` itself. – Marc Le Bihan Oct 22 '19 at 16:05
  • Here the result I mean response/dataset what ever you want to respond back. – Sandhya Oct 23 '19 at 05:06