-2

I want to display the number of elements in each partition, so I write the following:

def count_in_a_partition(iterator):
    yield sum(1 for _ in iterator)

If I use it like this

print("number of element in each partitions: {}".format(
  my_rdd.mapPartitions(count_in_a_partition).collect()
))

I get the following:

19/02/18 21:41:15 INFO DAGScheduler: Job 3 failed: collect at /project/6008168/tamouze/testSparkCedar.py:435, took 30.859710 s
19/02/18 21:41:15 INFO DAGScheduler: ResultStage 3 (collect at /project/6008168/tamouze/testSparkCedar.py:435) failed in 30.848 s due to Stage cancelled because SparkContext was shut down
19/02/18 21:41:15 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/02/18 21:41:16 INFO MemoryStore: MemoryStore cleared
19/02/18 21:41:16 INFO BlockManager: BlockManager stopped
19/02/18 21:41:16 INFO BlockManagerMaster: BlockManagerMaster stopped
19/02/18 21:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/02/18 21:41:16 WARN BlockManager: Putting block rdd_3_14 failed due to exception java.net.SocketException: Connection reset.
19/02/18 21:41:16 WARN BlockManager: Block rdd_3_14 could not be removed as it was not found on disk or in memory
19/02/18 21:41:16 WARN BlockManager: Putting block rdd_3_3 failed due to exception java.net.SocketException: Connection reset.
19/02/18 21:41:16 WARN BlockManager: Block rdd_3_3 could not be removed as it was not found on disk or in memory
19/02/18 21:41:16 INFO SparkContext: Successfully stopped SparkContext
....

noting that my_rdd.take(1) return :

[(u'id', u'text', array([-0.31921682, ...,0.890875]))]

How can I solve this issue?

zero323
  • 322,348
  • 103
  • 959
  • 935
bib
  • 944
  • 3
  • 15
  • 32
  • This is not salvageable in it's current state. Can you add an MVCE please ? This will help you write it https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples – eliasah Feb 22 '19 at 15:58

1 Answers1

0

You have to use glom() function for that. Let’s take an example.

Let's create a DataFrame first.

rdd=sc.parallelize([('a',22),('b',1),('c',4),('b',1),('d',2),('e',0),('d',3),('a',1),('c',4),('b',7),('a',2),('f',1)] )
df=rdd.toDF(['key','value'])
df=df.repartition(5,"key") # Make 5 Partitions

The number of partitions -

print("Number of partitions: {}".format(df.rdd.getNumPartitions())) 
    Number of partitions: 5

Number of rows/elements on each partition. This can give you an idea of skew -

print('Partitioning distribution: '+ str(df.rdd.glom().map(len).collect()))
    Partitioning distribution: [3, 3, 2, 2, 2]

See how actually are rows distributed on the partitions. Behold that if the dataset is big, then your system could crash because of Out of Memory OOM issue.

print("Partitions structure: {}".format(df.rdd.glom().collect()))
    Partitions structure: [
       #Partition 1        [Row(key='a', value=22), Row(key='a', value=1), Row(key='a', value=2)], 
       #Partition 2        [Row(key='b', value=1), Row(key='b', value=1), Row(key='b', value=7)], 
       #Partition 3        [Row(key='c', value=4), Row(key='c', value=4)], 
       #Partition 4        [Row(key='e', value=0), Row(key='f', value=1)], 
       #Partition 5        [Row(key='d', value=2), Row(key='d', value=3)]
                          ]
cph_sto
  • 7,189
  • 12
  • 42
  • 78
  • 3
    The solution that OP is using is correct and optimal in terms of performance. So _you have to use `glom`_ is simply incorrect, proposed solution is strictly worse than the original one (as you already noticed yourself it has to load whole partitions), and doesn't address the actual source of failure (which, to be fair, is impossible to determine with the information provided in the post). – zero323 Feb 19 '19 at 14:54