That's not strange. Since you didn't provide the schema, Spark has to infer it based on the data. If the RDD
is an input, it will call SparkSession._createFromRDD
and subsequently SparkSession._inferSchema
, which, if samplingRatio
is missing, will evaluate up to 100 row:
first = rdd.first()
if not first:
raise ValueError("The first row in RDD is empty, "
"can not infer schema")
if type(first) is dict:
warnings.warn("Using RDD of dict to inferSchema is deprecated. "
"Use pyspark.sql.Row instead")
if samplingRatio is None:
schema = _infer_schema(first, names=names)
if _has_nulltype(schema):
for row in rdd.take(100)[1:]:
schema = _merge_type(schema, _infer_schema(row, names=names))
if not _has_nulltype(schema):
break
else:
raise ValueError("Some of types cannot be determined by the "
"first 100 rows, please try again with sampling")
Now the only puzzle left if why it doesn't evaluate exactly one record. After-all in your case first
is not empty and doesn't contain None
.
That's because first
is implemented through take
and doesn't guarantee that the exact number of items will evaluated. If the first partition doesn't yield the required number of items, it will iteratively increase number of partitions to scan. Please check the implementation for details.
If you want to avoid this you should use createDataFrame
and provide schema either as DDL string:
spark.createDataFrame(a.map(f), "val: integer")
or equivalent StructType
.
You won't find any similar behavior in Scala counterpart, because it doesn't use schema inference in toDF
. It either retrieves corresponding schema from the Encoder
(which is fetched using Scala reflection), or doesn't allow conversion at all. The closest similar behavior is inference on input source like CSV or JSON:
spark.read.json(Seq("""{"foo": "bar"}""").toDS.map(x => { println(x); x }))