7

I am developing a Spark processing framework which reads large CSV files, loads them into RDD's, performs some transformations and at the end saves some statistics.

The CSV files in question are around 50GB on average. I'm using Spark 2.0.

My question is:

When I load the files using sparkContext.textFile() function, does the file needs to be stored in the memory of the driver first, and then it is distributed to the workers (thus requiring a rather large amount of memory on the driver)? Or the file is read "in parallel" by every worker, in a way none of them needs to store the whole file, and the driver acts only as a "manager"?

Thanks in advance

Ander
  • 513
  • 2
  • 4
  • 14

2 Answers2

16

When you define the reading, the file would be divided to partitions based on your parallelism scheme and the instructions would be sent to the workers. Then the file is read directly by the workers from the filesystem (hence the need for a distributed filesystem available to all the nodes such as HDFS).

As a side note, it would be much better to read it to a dataframe using spark.read.csv and not in RDD. This would take less memory and would allow spark to optimize your queries.

UPDATE

In the comment, it was asked what would happen if the file system was not distributed and the file would be located on only one machine. The answer is that If you have more than 1 machine it will most likely fail.

When you do the sparkContext.textFile, nothing is actually read, it just tells spark WHAT you want to read. Then you do some transformation on it and still nothing is read because you are defining a plan. Once you perform an action (e.g. collect) then the actual processing begins. Spark would divide the job into tasks and send them to the executors. The executors (which might be on the master node or on worker nodes) would then attempt to read portions of the file. The problem is that any executor NOT on the master node would look for the file and fail to find it causing the tasks to fail. Spark would retry several times (I believe the default is 4) and then fail completely.

Of course if you have just one node then all executors will see the file and everything would be fine. Also in theory, it could be that the tasks would fail on worker and then rerun on the master and succeed there but in any case the workers would not do any work unless they see a copy of the file.

You can solve this by copying the file to the exact same path in all nodes or by using any kind of distributed file system (even NFS shares are fine).

Of course you can always work on a single node but then you would not be taking advantage of spark's scalability.

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • Regarding the first part of your answer, let's suppose I do NOT have a distributed file system and the file is located in the file system of **only one machine** (assume it's the master). In that scenario, what would happen? That single machine would have to load it into memory **before** the distribution happens? – Ander Feb 13 '17 at 16:59
  • Thanks, now I get it. – Ander Feb 13 '17 at 18:24
  • @AnderMurilloZohn Even if we copy the data file into all the nodes, I have observed that the resulting rdd after loading and repartitioning resides in only a single node. Is there a way to distribute it across nodes? – Swaroop Vajrapu May 15 '17 at 11:49
  • **@AssafMendelson** can you elaborate `"file would be divided to partitions based on your parallelism scheme"`? How to define a **parallelism scheme**? I can read `csv` as: `val df: DataFrame = spark.read.csv("path/to/file")` (assuming file resides on `HDFS`); but this method takes no parameter to control *parallelism*. I can always invoke `df.repartition(numPartitions)` on the resulting `DataFrame`, but that would be put into effect only after original `DataFrame` is read *serially* (not parallely) *[correct me if i'm wrong]*. – y2k-shubham Apr 12 '18 at 14:26
  • @y2k-shubham When you do val df = XXX you are not actually reading the dataframe. Instead you are telling the dataframe HOW to be created. When you do repartition you are telling it to change the partitioning from the default (which is either the number of blocks or the default number defined in the configuration). Then when you do an action, spark does an optimization and might push down the reading to the actual data reading depending on its internal schemes. At least in theory if you do repartition before doing any action it should do parallel reading (parallelize by line) – Assaf Mendelson Apr 12 '18 at 15:21
  • Okay I get your point. Ideally I would expect `Spark` to do exactly this (`"At least in theory if you do repartition.."`). I will update you once I confirm if this is the actual behaviour. Meanwhile in [this answer](https://stackoverflow.com/a/29012187/3679900), **@0x0FFF** gives a valuable insight: if your'e reading file from `HDFS`, then `Spark` would map each *input split* of the file (as on `HDFS`) to a `partition`. – y2k-shubham Apr 13 '18 at 14:23
  • Well, upon a second thought, reading the *last paragraph* of [the answer](https://stackoverflow.com/a/29012187/3679900) makes me think that `Spark` might not be *clever* enough to be able to exhibit our `in theory..` behaviour – y2k-shubham Apr 13 '18 at 14:33
  • @y2k-shubham the HDFS split is the number of blocks I mentioned. The answer you mentioned looks at RDD as opposed to dataframe which is the reason it does not do so automatically. For dataframe that would depend on the file format (and I believe it is true for CSV, you should simply try it out). – Assaf Mendelson Apr 14 '18 at 17:39
  • Okay. Also since since `repartition` and `coalesce` [are `transformation`s](http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations), I believe our `"in theory.."` behaviour would be exhibited by `Spark` *[unless there's something else holding it back that we aren't aware of]* – y2k-shubham Apr 17 '18 at 05:33
  • @Assaf Mendelson -- what will happen when we read the file locally from unix? will it get distributed among worker nodes efficiently. – vikrant rana May 14 '19 at 08:17
  • 1
    @vikrantrana When you read a file locally, each node will attempt to read it from the same path. If this path is local and the file is only in one node, you would get an error (exception) in other nodes. When writing, each file will be written to its relevant node (i.e. some files in one node and some in others). To avoid this either use a distributed file system (e.g. HDFS) or mount a shared folder to the same path (or if this is read only, copy the file yourself). – Assaf Mendelson May 14 '19 at 09:55
  • ahh.. got it.. we can pass file to each executor using --file option or addFile method in spark. – vikrant rana May 14 '19 at 10:05
0

CSV (with no gz compression) is splitteable, so Spark split using default block size of 128M.

This means, 50Go => 50Go/128Mo = 50*1024/128 = 400 blocks.

block 0 = range 0 - 134217728

block 1 = range 134217728 - 268435456

etc.

Of course, last line in each text block (class PartitionedFile) will cross 128M boudary, but spark will read few chars more until end of next line. Spark also ignore first line of block when it is not at pos==0

Internally spark use Hadoop class LineRecordReader (shaded in org.apache.hadoop.shaded.org.apache.hadoop.mapreduce.lib.input.LineRecordReader )

cf source code for positionning at first line of a block ...

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
    start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}