1

I have a pyspark rdd and trying to convert it into a dataframe using some custom sampling ratio. But I am getting below error sometimes that empty rdd cannot be used to create dataframe

ValueError: Can not reduce() empty RDD

Below is my code. As I said, it is not erroring out always. Only some times it is failing.

from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)


myrdd = sc.parallelize([
    (1, 638.55),
    (2, 638.55),
    (3, 638.55),
    (4, 638.55),
    (5, 638.55)
])

for i in range(100):
    print(i)
    df2 = sqlContext.createDataFrame(myrdd, samplingRatio=0.4)

When I give sampling ratio as 1, it doesn't fail. I don't know why it isn't consistent. Or am I missing any point about sampling ratio?

newbie
  • 1,282
  • 3
  • 20
  • 43
  • Hi, could you please check if my answer is what you needed to know. If something is missing or it doesn't answer it, please leave a comment so I can fix it. It's is quite discouraging if there is no response to an answer. It took me quite some time to investigate and reproduce your error and provide an explanation. – cozek Dec 04 '19 at 06:40

1 Answers1

2

As you might already know, most spark operations are done via internal RDD calls. If you look at the stack trace of the error (i reproduced your error), you will come across this particular line

/usr/local/spark/python/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio, names)
    380             if samplingRatio < 0.99:
    381                 rdd = rdd.sample(False, float(samplingRatio))
--> 382             schema = rdd.map(lambda row: _infer_schema(row, names)).reduce(_merge_type)
    383         return schema
    384 

From the above error snippet, you can see that internally it calls rdd = rdd.sample(False, float(samplingRatio)). samplingRatio is the probability of being selected from the population rather than the fraction of items to select from the population. Since you have a pretty small rdd, it sometimes results in an empty rdd coupled with a small probability of 0.4, sometimes none of the items are selected resulting in an empty rdd which leads to your error. You can see this in action using the snippet below

myrdd = sc.parallelize([
    (1, 638.55),
    (2, 638.55),
    (3, 638.55),
    (4, 638.55),
    (5, 638.55)
])
for i in range(100):
    rdd2 = myrdd.sample(False, 0.4)
    sampledRdd= rdd2.collect()
    if sampledRdd == []:
        print('got empty')
        break;

Hence, for consistent behavior, you need to use a large enough population for your initial myrdd or perhaps handle the exception.

You can read more about how the sampling is done in this StackOverflow answer.

cozek
  • 755
  • 6
  • 9