0

Say I have a file of 256 KB is stored on HDFS file system of one node (as two blocks of 128 KB each). This file internally contains two blocks of 128 KB each. Assume I have two nodes cluster of each 1 core only. My understanding is that spark during transformation will read complete file on one node in memory and then transfer one file block memory data to other node so that both nodes/cores can parallely execute it ? Is that correct ?

What if both nodes had two core each instead of one core ? In that case two cores on single node could do the computation ? Is that right ?

         val text = sc.textFile("mytextfile.txt") 
         val counts = text.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) 
         counts.collect
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
emilly
  • 10,060
  • 33
  • 97
  • 172
  • The general principle are clear, the edge cases less so. It all depends on node distribution of data and number of cores. There can be a lot of wastage. Here is an excellent read https://stackoverflow.com/questions/46638901/how-spark-read-a-large-file-petabyte-when-file-can-not-be-fit-in-sparks-main I will answre what I think is correct in a minute. You have HDFS and S3 and such these days, so I take your question as meaning HDFS. – thebluephantom May 15 '19 at 18:01
  • Difficult to answer as it is likely that the blocks would be on 2 data nodes / workers in non-S3 et al mode. I.e. HDFS. Then there are issues of rack and data locality, so this is an edge case that is highly unlikely - the first one that is. – thebluephantom May 15 '19 at 18:12
  • See https://stackoverflow.com/questions/54355677/number-of-executor-cores-and-benefits-or-otherwise-spark a question I also asked - as you are getting into the nitty gritty. That explains the multi cores, not common. – thebluephantom May 15 '19 at 18:12
  • In fact these comments should suffice. – thebluephantom May 15 '19 at 18:12
  • In principle the first question is what could be expected, theoretically. – thebluephantom May 15 '19 at 18:26
  • I assume you meant MB instead of KB. Hadoop would never store a 256KB file on multiple blocks unless you specifically configured your cluster to use tiny blocks. – tk421 May 15 '19 at 18:38
  • https://stackoverflow.com/questions/44222307/spark-rdd-default-number-of-partitions this can also play a role – thebluephantom May 15 '19 at 19:22
  • 2 x 128 = 256, but all very unlikely – thebluephantom May 15 '19 at 19:22
  • Of an value to you? – thebluephantom May 16 '19 at 14:53

2 Answers2

1

You question is a little hypothetical as it is unlikely you would have an Hadoop Cluster with HDFS existing with only one Data Node and 2 Worker Nodes - one being both Worker and Data Node. That is to say, the whole idea of Spark (and MR) with HDFS is to bring the processing to the data. The Worker Nodes are in fact the Data Nodes in the standard Hadoop set up. This is the original intent.

Some variations to answer your question:

  • Assuming the case as per above described, each Worker Node would process one partition and subsequent transformations on the newer generated RDDs until finished. You may of course repartition the data and what happens depends on the number of partitions and number of Executors per Worker Node.

  • In a nutshell: if you have N blocks / partitions initially and less than N Executors allocated - E - on a Hadoop Cluster with HDFS, then you will get some transfer of blocks (not a shuffle as is talked about elsewhere) to the Workers assigned, from Workers where no Executor was allocated to you Spark App, otherwise the block is assigned to be processed to that Data / Worker Node, obviously. Each block / partition is processed in some way, shuffled and the next set of Partitions or Partition read in and processed, depending on speed of processing for your transformation(s).

  • In the case of AWS S3 and Mircosoft's and gooogle's equivalent Cloud Storage which leave aside the principle of data locality as in the above case - i.e. compute power is divorced from storage, with the assumption that the network is not the bottleneck - which was exactly the Hadoop classic reason to bring the processing to the data, then it works similarly to the aforementioned, i.e. transfer of S3 data to Workers.

All of this assume an Action has been invoked.

I leave aside the principles of Rack Awareness, etc. as it becomes all quite complicated, but the Resource Managers understand these things and decide accordingly.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
0

In the first case, Spark will usually load 1 partition on the first node and then if it cannot find an empty core, it will load the 2nd partition on the 2nd node after waiting for spark/locality.wait (default 3 seconds).

In the 2nd case both partitions will be loaded on the same node unless it does not have both cores free.

Many circumstances can cause this to change if you play with the default configurations.

Anil
  • 84
  • 6