7

I have a scenario where a certain number of operations including a group by has to be applied on a number of small (~300MB each) files. The operation looks like this..

df.groupBy(....).agg(....)

Now to process it on multiple files, I can use a wildcard "/**/*.csv" however, that creates a single RDD and partitions it to for the operations. However, looking at the operations, it is a group by and involves lot of shuffle which is unnecessary if the files are mutually exclusive.

What, I am looking at is, a way where i can create independent RDD's on files and operate on them independently.

zero323
  • 322,348
  • 103
  • 959
  • 935
Love Hasija
  • 2,528
  • 2
  • 27
  • 26

3 Answers3

9

It is more an idea than a full solution and I haven't tested it yet.

You can start with extracting your data processing pipeline into a function.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

If your files are small you can adjust n parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.

val n: Int = ??? 

Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:

val files: Array[String] = ???

Next you can map above list using pipeline function:

val rdds = files.map(f => pipeline(f, n))

Since we limit concurrency at the level of the single file we want to compensate by submitting multiple jobs. Lets add a simple helper which forces evaluation and wraps it with Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

Finally we can use above helper on the rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

Depending on your requirements you can add onComplete callbacks or use reactive streams to collect the results.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Well, as far as I am concerned there is no need for workaround because it completely doesn't matter here. `sqlContext` is used only on a driver so there is no reason for serialization whatsoever. – zero323 Aug 10 '15 at 12:55
  • 1
    @AlexNaspo Not thoroughly but I used similar approach once or twice. Unless you have far to much memory it makes more sense to execute actual action instead of depending on caching. If you're interested in general principle take a look at `org.apache.spark.rdd.AsyncRDDActions` – zero323 Aug 09 '16 at 23:17
1

If you have many files, and each file is small (you say 300MB above which I would count as small for Spark), you could try using SparkContext.wholeTextFiles which will create an RDD where each record is an entire file.

mattinbits
  • 10,370
  • 1
  • 26
  • 35
0

By this way we can write multiple RDD parallely

public class ParallelWriteSevice implements IApplicationEventListener {

    private static final IprogramLogger logger = programLoggerFactory.getLogger(ParallelWriteSevice.class);

    private static ExecutorService executorService=null;
    private static List<Future<Boolean>> futures=new ArrayList<Future<Boolean>>();

    public static void submit(Callable callable) {
        if(executorService==null)
        {
            executorService=Executors.newFixedThreadPool(15);//Based on target tables increase this
        }

        futures.add(executorService.submit(callable));
    }

    public static boolean isWriteSucess() {
        boolean writeFailureOccured = false;
        try {
            for (Future<Boolean> future : futures) {
                try {
                    Boolean writeStatus = future.get();
                    if (writeStatus == false) {
                        writeFailureOccured = true;
                    }
                } catch (Exception e) {
                    logger.error("Erorr - Scdeduled write failed " + e.getMessage(), e);
                    writeFailureOccured = true;
                }
            }
        } finally {
            resetFutures();         
              if (executorService != null) 
                  executorService.shutdown();
              executorService = null;

        }
        return !writeFailureOccured;
    }

    private static void resetFutures() {
            logger.error("resetFutures called");
            //futures.clear();
    }




}
Nico Griffioen
  • 5,143
  • 2
  • 27
  • 36