11

I have a folder with 150 G of txt files (around 700 files, on average each 200 MB).

I'm using scala to process the files and calculate some aggregate statistics in the end. I see two possible approaches to do that:

  • manually loop through all the files, do the calculations per file and merge the results in the end
  • read the whole folder to one RDD, do all the operations on this single RDD and let spark do all the parallelization

I'm leaning towards the second approach as it seems cleaner (no need for parallelization specific code), but I'm wondering if my scenario will fit the constraints imposed by my hardware and data. I have one workstation with 16 threads and 64 GB of RAM available (so the parallelization will be strictly local between different processor cores). I might scale the infrastructure with more machines later on, but for now I would just like to focus on tunning the settings for this one workstation scenario.

The code I'm using: - reads TSV files, and extracts meaningful data to (String, String, String) triplets - afterwards some filtering, mapping and grouping is performed - finally, the data is reduced and some aggregates are calculated

I've been able to run this code with a single file (~200 MB of data), however I get a java.lang.OutOfMemoryError: GC overhead limit exceeded and/or a Java out of heap exception when adding more data (the application breaks with 6GB of data but I would like to use it with 150 GB of data).

I guess I would have to tune some parameters to make this work. I would appreciate any tips on how to approach this problem (how to debug for memory demands). I've tried increasing the 'spark.executor.memory' and using a smaller number of cores (the rational being that each core needs some heap space), but this didn't solve my problems.

I don't need the solution to be very fast (it can easily run for a few hours even days if needed). I'm also not caching any data, but just saving them to the file system in the end. If you think it would be more feasible to just go with the manual parallelization approach, I could do that as well.

Igor
  • 817
  • 2
  • 9
  • 21
  • if you are running Spark in standalone mode, it cannot work. You need to run your application on resource manager like `YARN` per example, that runs on a Hadoop cluster. – eliasah Jul 04 '14 at 09:08
  • Does it make sense to run YARN on a single machine? Doesn't the standalone mode (when properly configured) work the same as a cluster manager if no distributed cluster is present? – Igor Jul 04 '14 at 09:47
  • 1
    How will you fit 150G on your 64RAM thought if you are not planning to use a distributed cluster? – eliasah Jul 04 '14 at 09:49
  • 2
    I was thinking of something in the way of taking a chunk of data, processing it, storing partial results on disk (if needed), continuing with the next chunk until all are done, and finally merging partial results in the end. – Igor Jul 04 '14 at 11:54
  • well this depends on wether there is continuity between your data or not. Other possibility might be the use of a distributed search engine like Solr or Elasticsearch to index the data and then you might be able to run statistics function on. Everything depends on the schema of the data and how you are actually using it. – eliasah Jul 04 '14 at 12:03
  • 2
    @Igor by massively increasing the number of partitions you use this can result in the effect you are after - i.e. processing a bit at a time. This answer has a list of all the things you can try: http://stackoverflow.com/a/22742982/1586965 – samthebest Jul 06 '14 at 09:52
  • possible duplicate of [spark java.lang.OutOfMemoryError: Java heap space](http://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space) – samthebest Jul 06 '14 at 09:53
  • Not solving your problem, but adding additional information: typically, when loading data from disk into a Spark RDD, the data consumes much more space in RAM than on disk. This is paritally due to the overhead of making byte arrays into Java String objects. – David Jul 08 '14 at 00:49
  • I would start by understanding what you want to do and how in very simple terms. Why do you need them to be loaded in memory if performance is not an issue? Are your executed algorithms iterative in nature? If not, maybe Hadoop MR would be a better choice. In my experience I have found that stressing Spark in terms of memory comes with some trouble. e.g. Sparks will partially evict cached RDDs one at a time and run a GC. This kills performance. (-Older version so could be fixed now). I have also found useful to use Hadoop MR to pre-process data before using Spark. Hope this helps – Ioannis Deligiannis Aug 19 '14 at 09:53

3 Answers3

4

Me and my team had processed a csv data sized over 1 TB over 5 machine @32GB of RAM each successfully. It depends heavily what kind of processing you're doing and how.

  1. If you repartition an RDD, it requires additional computation that has overhead above your heap size, try loading the file with more paralelism by decreasing split-size in TextInputFormat.SPLIT_MINSIZE and TextInputFormat.SPLIT_MAXSIZE (if you're using TextInputFormat) to elevate the level of paralelism.

  2. Try using mapPartition instead of map so you can handle the computation inside a partition. If the computation uses a temporary variable or instance and you're still facing out of memory, try lowering the number of data per partition (increasing the partition number)

  3. Increase the driver memory and executor memory limit using "spark.executor.memory" and "spark.driver.memory" in spark configuration before creating Spark Context

Note that Spark is a general-purpose cluster computing system so it's unefficient (IMHO) using Spark in a single machine

Averman
  • 441
  • 2
  • 8
  • do you have example code for using limited memory to read large file? Especially how you use ```TextInputFormat.SPLIT_MAXSIZE``` and ```mapPartitions```? I'm using ```conf.set("TextInputFormat.SPLIT_MAXSIZE", "512M")```, there is no luck. – Kane Apr 27 '15 at 09:41
0

To add another perspective based on code (as opposed to configuration): Sometimes it's best to figure out at what stage your Spark application is exceeding memory, and to see if you can make changes to fix the problem. When I was learning Spark, I had a Python Spark application that crashed with OOM errors. The reason was because I was collecting all the results back in the master rather than letting the tasks save the output.

E.g.

for item in processed_data.collect():
   print(item)
  • failed with OOM errors. On the other hand,

processed_data.saveAsTextFile(output_dir)

  • worked fine.
shaneb
  • 1,314
  • 1
  • 13
  • 18
0

Yes, PySpark RDD/DataFrame collect() function is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group(), count() etc. Retrieving larger dataset results in out of memory.

Brian Tompsett - 汤莱恩
  • 5,753
  • 72
  • 57
  • 129
lavi
  • 1