0

I am running Spark job on YARN Cluster and totally 250 tasks are created, almost every time 230 tasks are completed in 10 minutes, but remaining 20 tasks are taking another 1 hour to complete.

Its taking time in saveAsTextFile() action.

And in that most of the parts written are empty [almost more than 150 are empty]

Any specific reason, why its running very slow.I do not see any errors in the stderr log.

Shankar
  • 8,529
  • 26
  • 90
  • 159

2 Answers2

1

It could be for a number of reasons, and to fully answer we would need to look at the code. Even if it's taking time at saveAsTextFile(), the operation causing it may be another. Mu hunch is that before the save operation, you are using reduceByKey or a GROUP BY.

Now, those operations can be problematics if you have skewed data, that is, data that is unbalanced, where most of the records belong to just a few keys. For instance, if you are grouping by US state, there are only 50, so you'd have only 50 tasks actually doing work, so even if you have 250 tasks in total, they won't have any input. Or let's say you're grouping your users by country, but most of your users are from the US: you'd have one task processing most of the data and finishing much later than the others.

So, what you have to do is look at any operation that performs a grouping/reducing before the save, and look at the data to see if there's any skew.

Roberto Congiu
  • 5,123
  • 1
  • 27
  • 37
  • I don't think grouping operations can produce empty partitions. I suspect either filtering out whole partitions or bad `hashCode`. – whaleberg Feb 09 '16 at 22:17
  • 1
    sure they can... try grouping by US state (50 total) with 250 reducers. Spark will estimate the number of partitions based on input size, but without estimating the cardinality it won't know if there are too many partitions. And you don't need the partitions to be empty but just a skewed distribution of values over key hashes. But not enough information was supplied to determine the actual reason. – Roberto Congiu Feb 10 '16 at 00:16
1

This sort of bad distribution of elements in an RDD can be caused by the elements having a bad hash function. The default partitioning uses a HashPartitioner, this assigns partitions by taking element.hashCode() % numberOfPartitions. If you have a hash function that poorly distributes your elements than you will have highly skewed partitions and empty partitions.

Check the hashCode() method for whatever the elements of your RDD are and verify that it produces a good distribution of results for your elements. See Best implementation for hashCode method for a good advice on implementing hashCode(). It's likely that your IDE can generate a good implementation for you.

Community
  • 1
  • 1
whaleberg
  • 2,093
  • 22
  • 26