5

Pardon my simple question but I'm relatively new to Spark/Hadoop.

I'm trying to load a bunch of small CSV files into Apache Spark. They're currently stored in S3, but I can download them locally if that simplifies things. My goal is to do this as efficiently as possible. It seems like it would be a shame to have some single-threaded master downloading and parsing a bunch of CSV files while my dozens of Spark workers sit idly. I'm hoping there's an idiomatic way to distribute this work.

The CSV files are arranged in a directory structure that looks like:

2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...

I have two years of data, with directories for each day, and a few hundred CSVs inside of each. All of those CSVs should have an identical schema, but it's of course possible that one CSV is awry and I'd hate for the whole job to crash if there are a couple problematic files. Those files can be skipped as long as I'm notified in a log somewhere that that happened.

It seems that every Spark project I have in mind is in this same form and I don't know how to solve it. (e.g. trying to read in a bunch of tab-delimited weather data, or reading in a bunch of log files to look at those.)

What I've Tried

I've tried both SparkR and the Scala libraries. I don't really care which language I need to use; I'm more interested in the correct idioms/tools to use.

Pure Scala

My original thought was to enumerate and parallelize the list of all year/mm-dd combinations so that I could have my Spark workers all processing each day independently (download and parse all CSV files, then stack them on top of eachother (unionAll()) to reduce them). Unfortunately, downloading and parsing the CSV files using the spark-csv library can only be done in the "parent"/master job, and not from each child as Spark doesn't allow job nesting. So that won't work as long as I want to use the Spark libraries to do the importing/parsing.

Mixed-Language

You can, of course, use the language's native CSV parsing to read in each file then "upload" them to Spark. In R, this is a combination of some package to get the file out of S3 followed by a read.csv, and finishing off with a createDataFrame() to get the data into Spark. Unfortunately, this is really slow and also seems backwards to the way I want Spark to work. If all my data is piping through R before it can get into Spark, why bother with Spark?

Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp

I've started looking into these tailored tools and quickly got overwhelmed. My understanding is that many/all of these tools could be used to get my CSV files from S3 into HDFS.

Of course it would be faster to read my CSV files in from HDFS than S3, so that solves some portion of the problem. But I still have tens of thousands of CSVs that I need to parse and am unaware of a distributed way to do that in Spark.

Community
  • 1
  • 1
Jeff Allen
  • 17,277
  • 8
  • 49
  • 70
  • Note: for log files s3distcp ( http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html ) has a `--groupBy` option that allows you to concatenate files together while copying. That would be nice for log files, but perhaps not for a CSV where you're now creating an all-or-nothing scenario when you go to parse you master CSV. Also header lines. Would be preferable to parse and aggregate each CSV file individually. – Jeff Allen Aug 03 '15 at 20:26

2 Answers2

2

So right now (Spark 1.4) SparkR has support for json or parquet file structures. Csv files can be parsed, but then the spark context needs to be started with an extra jar (which needs to be downloaded and placed in the appropriate folder, never done this myself but my collegues have).

sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

There is more information in the docs. I expect that a newer spark release would have more support for this.

If you don't do this you'll need to either resort to a different file structure or use python to convert all your files from .csv into .parquet. Here is a snippet from a recent python talk that does this.

data = sc.textFile(s3_paths, 1200).cache()

def caster(x):
    return Row(colname1 = x[0], colname2 = x[1])

df_rdd = data\
    .map(lambda x: x.split(','))\
    .map(caster)

ddf = sqlContext.inferSchema(df_rdd).cache()

ddf.write.save('s3n://<bucket>/<filename>.parquet')

Also, how big is your dataset? You may not even need spark for analysis. Note that also as of right now;

  • SparkR has only DataFrame support.
  • no distributed machine learning yet.
  • for visualisation you will need to convert a distributed dataframe back into a normal one if you want to use libraries like ggplot2.
  • if your dataset is no larger than a few gigabytes, then the extra bother of learning spark might not be worthwhile yet
  • it's modest now, but you can expect more from the future
cantdutchthis
  • 31,949
  • 17
  • 74
  • 114
  • Thanks. But wouldn't the first approach still do all the downloading/parsing on the master? I got CSV reading over `read.df` in SparkR working, but it just seemed like an inefficient/serial way to process tens of thousands of CSV files. The CSV -> parquet approach might be the one I have to take, but it seems like a shame to have to batch pre-process that. To be clear, is that doing the S3 downloading in parallel from the workers? Or would `sc.textFile` get each one serially? – Jeff Allen Aug 03 '15 at 20:41
  • Oh, and my data is certainly not big enough to merit Spark. I'm just trying to be a good sport with the "big data" trends. :) – Jeff Allen Aug 03 '15 at 20:42
  • Note that `sc.textFile` is python code and that I am telling spark to use `1200` partitions. This will 'force' parallism later on, and I believe at reading the data as well. Not too sure on that. Note that `sc.textFile` method accepts wildcards. [docs](http://spark.apache.org/docs/latest/programming-guide.html) – cantdutchthis Aug 03 '15 at 20:45
  • Sorry to be dense, but "force parallelism" after the data has been downloaded serially from the master? Or force it during the download phase? – Jeff Allen Aug 03 '15 at 20:47
  • I think both. [mailing on subject](http://mail-archives.us.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAOhmDzckPf0x9yQgiDSP6KWnUJ76OWPfs74x5jipQoHPSpCnCg@mail.gmail.com%3E). I recall playing with the num_partitions and seeing it cause varying read times. It wouldn't hurt to try and confirm. – cantdutchthis Aug 03 '15 at 20:49
  • I found that both `sc.textFile` and the `spark-csv` do seem to parallelize the operation over their glob'd/enumerated input. So it seems that the answer to my question is just to pass in all your files to either a text or CSV reader and let Spark handle the rest. Easy enough! – Jeff Allen Aug 04 '15 at 19:40
1

I've run into this problem before (but w/ reading a large qty of Parquet files) and my recommendation would be to avoid dataframes and to use RDDs.

The general idiom used was:

  1. Read in a list of the files w/ each file being a line (In the driver). The expected output here is a list of strings
  2. Parallelize the list of strings and map over them with a customer csv reader. with the return being a list of case classes.

You can also use flatMap if at the end of the day you want a data structure like List[weather_data] that could be rewritten to parquet or a database.