1

I'm (very) new to Spark, so my terminology may be off, but here's what I'm trying to do:

  • for every day I have a set of CSV files representing snapshots of tables in a database on S3 (call them tables A, B, C, stored at e.g. s3://bucket/20171027/a.csv.gz)
  • I want to join these tables (for a single day) using Spark SQL on a key (id), and then save the joined table to S3 as JSON.

I am able to do this sequentially (day by day), but want to take advantage of Spark parallelization.

My current process is roughly:

  • list all the files in S3
  • group them by timestamp
  • create an array of filenames to join
  • result is a map of timestamp => files (e.g. 20171027 => ["s3://foo/20171027/a.csv", "s3://foo/20171027/b.csv"])

Then, for each day, I load each file into a DataFrame, do some logic around removing duplicate columns, and call df1.join(df2). Once the joins are finished, I call df.write.json

Each day can be done independently, but I can't figure out how to have Spark run these join operations concurrently. I tried using sc.parallelize with the timestamps as the sequence, but I can't have the Spark context load the DataFrames in the executors, and if load the DataFrames before calling parallelize, the executor can't read them and throws a NullObjectException. I am thinking I need to look into using futures, but am wondering if there are other options to accomplish what I'm trying to do, or if I'm making it more complicated than it needs to be.

ap0
  • 41
  • 1
  • 4
  • Would you care please explain why your question is a duplicate of the one mentioned by @user8371915 ? – eliasah Oct 31 '17 at 09:15

1 Answers1

2

The solution I came up with was to use Futures with a thread pool that was equal to the number of executors. Looping through every day, I do the operations of joining tables and then writing to disk in its own Future. The thread pool limits the concurrency to the number of executors. It then awaits for all futures to complete prior to finishing up.

implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(numExecutors))
val futures = ArrayBuffer[Future[Unit]]()

for (date <- files.keys) {
  val f = Future {
    // load tables from S3 into data frames
    // join data frames on ID
    // write joined dataframe to S3
  }
  futures += f
}

futures.foreach(f => Await.ready(f, Duration.Inf))
ap0
  • 41
  • 1
  • 4