1

I'm doing some complex operations in Pyspark where the last operation is a flatMap that yields an object of type pyspark.rdd.PipelinedRDD whose content is simply a list of strings:

print(output_data.take(8))
> ['a', 'abc', 'a', 'aefgtr', 'bcde', 'bc', 'bhdsjfk', 'b']

I'm starting my Spark-Session like this (local session for testing):

spark = SparkSession.builder.appName("my_app")\
    .config('spark.sql.shuffle.partitions', '2').master("local").getOrCreate()

My input data looks like this:

input_data = (('a', ('abc', [[('abc', 23)], 23, False, 3])),
              ('a', ('abcde', [[('abcde', 17)], 17, False, 5])),
              ('a', ('a', [[('a', 66)], 66, False, 1])),
              ('a', ('aefgtr', [[('aefgtr', 65)], 65, False, 6])),
              ('b', ('bc', [[('bc', 25)], 25, False, 2])),
              ('b', ('bcde', [[('bcde', 76)], 76, False, 4])),
              ('b', ('b', [[('b', 13)], 13, False, 1])),
              ('b', ('bhdsjfk', [[('bhdsjfk', 36)], 36, False, 7])))
input_data = sc.parallelize(input_data)

I want to turn that output RDD into a DataFrame with one column like this:

schema = StructType([StructField("term", StringType())])
df = spark.createDataFrame(output_data, schema=schema)

This doesn't work, I'm getting this error:

TypeError: StructType can not accept object 'a' in type <class 'str'>

So I tried it without schema and got this error:

TypeError: Can not infer schema for type: <class 'str'>

EDIT: The same error happens when trying toDF().

So for some reason I have a pyspark.rdd.PipelinedRDD whose elements are not StringType but standard Python str.

I'm relatively new to Pyspark so can someone enlighten me on why this might be happening?

I'm surprised Pyspark isn't able to implicitely cast str to StringType.

I can't post the entire code, just saying that I'm doing some complex stuff with strings including string comparison and for-loops. I'm not explicitely typecasting anything though.

Khris
  • 3,132
  • 3
  • 34
  • 54
  • I think this is what you're looking for: https://stackoverflow.com/questions/48111066/how-to-convert-pyspark-rdd-pipelinedrdd-to-data-frame-with-out-using-collect-m/48111699. Also worth reading: https://stackoverflow.com/questions/32788387/pipelinedrdd-object-has-no-attribute-todf-in-pyspark – pault Oct 21 '20 at 12:40
  • I have read those, sadly `toDF` doesn't help me, it throws the same error. – Khris Oct 21 '20 at 12:46

1 Answers1

1

One solution would be to convert your RDD of String into a RDD of Row as follows:

from pyspark.sql import Row
df = spark.createDataFrame(output_data.map(lambda x: Row(x)), schema=schema)
# or with a simple list of names as a schema
df = spark.createDataFrame(output_data.map(lambda x: Row(x)), schema=['term'])
# or even use `toDF`:
df = output_data.map(lambda x: Row(x)).toDF(['term'])
# or another variant
df = output_data.map(lambda x: Row(term=x)).toDF()

Interestingly, as you mention it, specifiying a schema for a RDD of a raw type like string does not work. Yet, if we only specify the type, it works but you cannot specify the name. Another approach would thus be to do just that and rename the column called value like this:

from pyspark.sql import functions as F
df = spark.createDataFrame(output_data, StringType())\
          .select(F.col('value').alias('term'))
# or similarly
df = spark.createDataFrame(output_data, "string")\
          .select(F.col('value').alias('term'))
Oli
  • 9,766
  • 5
  • 25
  • 46