9

I have spark job in which I process a file and then do following steps.

1. Load the file into DataFrame
2. Push the DataFrame to elasticsearch
3. Run some aggregations on dataframe and save to cassandra

I have written a spark job for this in which I have following function calls

writeToES(df)
writeToCassandra(df)

Now these two operations run one by one. However these two can run in parallel.

How can I do this in a single spark job.

I can make two spark jobs each for writing to ES and Cassandra. But they will use multiple ports, which I want to avoid.

hard coder
  • 5,449
  • 6
  • 36
  • 61
  • Which ports do you refer to? If you mean the ports for the Spark Admin Web, they will be given incrementally (ie if the first one uses 4040, the second will use 4041) In any case, you should cache the DataFrame before doing these operations to avoid loading them twice. – Victor Apr 04 '18 at 09:16
  • Set spark scheduler to `FAIR`, it's `FIFO` by default I think. Cache the DF. Try to work something out with Scala `Future`. – philantrovert Apr 04 '18 at 09:18
  • Yes, Same ports. Because I am running at 10 jobs separately. So there will be 20 ports needed. Which spark will failed to allocate. – hard coder Apr 04 '18 at 09:19

1 Answers1

8

You cannot run these two actions through the same spark job. What you're surely looking for is running these two jobs in parallel in the same application.

As the documentation says, you can run multiple jobs in parallel in the same application if those jobs are submitted from different threads:

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users).

In other words, this should run both actions in parallel (using completable future API here, but you can use any async execution or multithreading mechanism):

CompletableFuture.runAsync(() -> writeToES(df));
CompletableFuture.runAsync(() -> writeToCassandra(df));

You can then join on one or both of these two to wait for completion. As noted in the documentation, you need to pay attention to the configured scheduler mode. Using the FAIR scheduler allows you to run the above in parallel:

conf.set("spark.scheduler.mode", "FAIR")
ernest_k
  • 44,416
  • 5
  • 53
  • 99
  • 1
    That's the solution. You need to point out that this solution will share resources thus there is not actual gain of time if this is what the OP is looking for. – eliasah Apr 04 '18 at 11:52
  • I have tried this solution. But I think it is not working. Scheduler is set to FAIR but on spark UI, there are 2 jobs active but only one is running at a time and for other no No tasks have started yet. So they are not running in parallel – hard coder Apr 05 '18 at 05:18
  • 1
    One trick I used in Scala with parallel collections - not sure if correct, is to define a Map like `val jobs = Map(1 -> Job1, 2->Job2)` and then call `jobs.par.map( x => execute(x._2) ) ` – bmcristi Apr 08 '20 at 15:51