I am running Pyspark in Jupyter Notebook. I am trying to study the feasibility of manipulating the estimation of the "approx_count_distinct" function and for that I have created a spark dataframe with 100.000 random numbers:
C = 100000
data = random.sample(range(1, 10000000000), C)
schema = StructType([StructField('test', LongType(), True)])
#Spark dataframe with C different unique numbers
S = spark.createDataFrame([[x] for x in data], schema)
Then I created 3 more empty dataframes for the future calculations:
#Creates Empty RDD
emptyRDD = spark.sparkContext.emptyRDD()
HLL = spark.createDataFrame(data = emptyRDD, schema = schema)
Y = spark.createDataFrame(data = emptyRDD, schema = schema)
V = spark.createDataFrame(data = emptyRDD, schema = schema)
The problem is when I run the following loop, it takes forever and seems like it won't end (I let it run for approximately 15 hours without results ):
data_itr = S.rdd.toLocalIterator()
for row in data_itr:
#first we calculate the HLL estimation of the dataframe HLL (in the first iteration it is 0 because it is empty)
HLL_before = HLL.select(approx_count_distinct("test")).collect()[0][0]
#then we insert new element son the HLL dataframe
newRow = spark.createDataFrame([row["test"]], LongType())
HLL = HLL.union(newRow)
#finally we calculate again the HLL estimation after the insertion
HLL_after = HLL.select(approx_count_distinct("test")).collect()[0][0]
if (HLL_after == HLL_before):
Y = Y.union(newRow)
I need to go through each row because I need to know if the "approx_count_distinct" estimation changes after the insertion of each element.
The code is working for 10.000 rows but when I increase it till 100.000 it doesn't finish. Does anyone know what could be going on ? why is the run time growing so fast?
Thank you!