I'm doing some complex operations in Pyspark where the last operation is a flatMap
that yields an object of type pyspark.rdd.PipelinedRDD
whose content is simply a list of strings:
print(output_data.take(8))
> ['a', 'abc', 'a', 'aefgtr', 'bcde', 'bc', 'bhdsjfk', 'b']
I'm starting my Spark-Session like this (local session for testing):
spark = SparkSession.builder.appName("my_app")\
.config('spark.sql.shuffle.partitions', '2').master("local").getOrCreate()
My input data looks like this:
input_data = (('a', ('abc', [[('abc', 23)], 23, False, 3])),
('a', ('abcde', [[('abcde', 17)], 17, False, 5])),
('a', ('a', [[('a', 66)], 66, False, 1])),
('a', ('aefgtr', [[('aefgtr', 65)], 65, False, 6])),
('b', ('bc', [[('bc', 25)], 25, False, 2])),
('b', ('bcde', [[('bcde', 76)], 76, False, 4])),
('b', ('b', [[('b', 13)], 13, False, 1])),
('b', ('bhdsjfk', [[('bhdsjfk', 36)], 36, False, 7])))
input_data = sc.parallelize(input_data)
I want to turn that output RDD into a DataFrame with one column like this:
schema = StructType([StructField("term", StringType())])
df = spark.createDataFrame(output_data, schema=schema)
This doesn't work, I'm getting this error:
TypeError: StructType can not accept object 'a' in type <class 'str'>
So I tried it without schema
and got this error:
TypeError: Can not infer schema for type: <class 'str'>
EDIT: The same error happens when trying toDF()
.
So for some reason I have a pyspark.rdd.PipelinedRDD
whose elements are not StringType
but standard Python str
.
I'm relatively new to Pyspark so can someone enlighten me on why this might be happening?
I'm surprised Pyspark isn't able to implicitely cast str
to StringType
.
I can't post the entire code, just saying that I'm doing some complex stuff with strings including string comparison and for-loops. I'm not explicitely typecasting anything though.