There are 3 different kinds of default indexes in pandas on pyspark. I am not able to replicate their said behavior:
Setting up to test:
import pyspark.pandas as ps
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
pd.DataFrame({'id':np.arange(20000000),'b':np.random.choice(['a','b','c','d'],size=(20000000,),p=[0.25,0.25,0.25,0.25])}).to_csv('df_s.csv',index=None) #so many rows because the dataset needs to be greater than 128 Mbs otherwise it gets collected in just 1 partition
- Sequence type
- The Data would be collected on the same node (dataframe should have just 1 partition?)
- The default index is [0,1,2,3,...] (monotonically increasing by 1 and in order)
tests:
ps.set_option('compute.default_index_type','sequence')
dfsp = ps.read_csv('df_s.csv')
dfsp.head()
output:
id b
0 0 a
1 1 c
2 2 c
3 3 b
4 4 d
#Expected
dfsp.to_spark().rdd.getNumPartitions()
output:
8
#Unexpected
Question: Why is not the number of partitions 1 since for when the default index is set to 'sequence' all the data must be collected on a single node.
- Distributed-sequence
- It computes and generates the index in a distributed manner but it needs another extra Spark Job to generate the global sequence internally. It also does not guarantee the natural order of the results. In general, it becomes a continuously increasing number.
tests:
ps.set_option('compute.default_index_type','distributed-sequence')
dfsp = ps.read_csv('df_s.csv')
dfsp.head()
output:
id b
0 0 a
1 1 c
2 2 c
3 3 b
4 4 d
#Expected
dfsp.to_spark().rdd.getNumPartitions()
output:
8
#Unexpected
Questions: The dataframe being distributed to all 8 cores is the expected behaviour but, the indexes should not be ordered which they are. It seems this behaviour is also like sequence
type default index only.
- Distributed
- “distributed” index has almost no performance penalty and always creates monotonically increasing numbers. If the index is just needed as unique numbers for each row, or the order of rows, this index type would be the best choice. However, the numbers have an indeterministic gap
tests:
ps.set_option('compute.default_index_type','distributed')
dfsp = ps.read_csv('df_s.csv')
print(dfsp.to_spark().rdd.getNumPartitions())
output:
8
dfsp.head()
output:
id b
0 0 c
1 1 c
2 2 b
3 3 c
4 4 c
Questions: This is also sequence
type behaviour only. The index generated is an ordered sequence from 1 to wherever. It should be monotonically increasing numbers with an indeterministic gap.
Can somebody please help me clarify what I am not understanding correctly and what is the exact expected behaviour for all three types of the default index?