The spark doc says that .repartition()
returns a new DataFrame, which is by default Hash-Partitioned
. But, in the example I am running, as shown below, that's not the case.
rdd=sc.parallelize([('a',22),('b',1),('c',4),('b',1),('d',2),
('a',0),('d',3),('a',1),('c',4),('b',7),
('a',2),('a',22),('b',1),('c',4),('b',1),
('d',2),('a',0),('d',3),('a',1),('c',4),
('b',7),('a',2)]
)
df=rdd.toDF(['key','value'])
df=df.repartition(5,'key') #5 partitions on 'key' column
print("Partitioner: {}".format(df.rdd.partitioner)) # prints - 'Partitioner: None' Why??
Why I don't have a partitioner? Let me print the partitions using glom()
function -
print("Partitions structure: {}".format(df.rdd.glom().collect()))
[
[ #Partition 1
Row(key='a', value=22), Row(key='a', value=0), Row(key='a', value=1),
Row(key='a', value=2), Row(key='a', value=22), Row(key='a', value=0),
Row(key='a', value=1), Row(key='a', value=2)
],
[ #Partition 2
Row(key='b', value=1), Row(key='b', value=1), Row(key='b', value=7),
Row(key='b', value=1), Row(key='b', value=1), Row(key='b', value=7)
],
[ #Partition 3
Row(key='c', value=4), Row(key='c', value=4), Row(key='c', value=4),
Row(key='c', value=4)
],
[ #Partition 4 (empty)
],
[ #Partition 5
Row(key='d', value=2), Row(key='d', value=3), Row(key='d', value=2),
Row(key='d', value=3)
]
]
We can clearly see that the data is well partitioned by key
, with all Rows()
with same key
column ending on one partition. So, why does the partitioner
prints None
?
What can I do so as to have a partitioner
, which can further be used for optimization?