3

I have an RDD of the form RDD[(string, List(Tuple))], like below:

[(u'C1589::HG02922', [(83779208, 2), (677873089, 0), ...]

When attempting to run the below code to convert it to a dataframe, spark.createDataFrame(rdd) works fine but rdd.toDF() fails.

vector_df1 = spark.createDataFrame(vector_rdd) # Works fine.
vector_df1.show()
+--------------+--------------------+
|            _1|                  _2|
+--------------+--------------------+
|C1589::HG02922|[[83779208,2], [6...|
|       HG00367|[[83779208,0], [6...|
| C477::HG00731|[[83779208,0], [6...|
|       HG00626|[[83779208,0], [6...|
|       HG00622|[[83779208,0], [6...|
                   ...
vector_df2 = vector_rdd.toDF() # Tosses the error.

The error thrown is:

Traceback (most recent call last):
  File "/tmp/7ff0f62d-d849-4884-960f-bb89b5f3dd80/ml_on_vds.py", line 47, in <module>
    vector_df2 = vector_rdd.toDF().show()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 57, in toDF
  File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1124, in __call__
  File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1094, in _build_args
  File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 289, in get_command_part
AttributeError: 'PipelinedRDD' object has no attribute '_get_object_id'
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [7ff0f62d-d849-4884-960f-bb89b5f3dd80] entered state [ERROR] while waiting for [DONE].

Has anyone encountered an issue similar to this before? .toDF() is just a simple wrapper for createDataFrame() so I don't understand why it would fail. I have verified at runtime I am using Spark 2.0.2.

# Imports    
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, hash
from pyspark.sql.types import *
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from hail import *

# SparkSession
spark = (SparkSession.builder.appName("PopulationGenomics")
        .config("spark.sql.files.openCostInBytes", "1099511627776")
        .config("spark.sql.files.maxPartitionBytes", "1099511627776")
        .config("spark.hadoop.io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,org.apache.hadoop.io.compress.GzipCodec")
        .getOrCreate())

Per request, some more of the code which generates the error:

vector_rdd = (indexed_df.rdd.map(lambda r: (r[0], (r[3], r[2])))
              .groupByKey()
              .mapValues(lambda l: Vectors.sparse((max_index + 1), list(l))))
vector_df = spark.createDataFrame(vector_rdd, ['s', 'features']) # Works
vector_df1 = vector_rdd.toDF()
vector_df1.show() # Fails

indexed_df is a DataFrame of the schema:

StructType(List(StructField(s,StringType,true),StructField(variant_hash,IntegerType,false),StructField(call,IntegerType,true),StructField(index,DoubleType,true)))

And it looks like...

+--------------+------------+----+-----+
|             s|variant_hash|call|index|
+--------------+------------+----+-----+
|C1046::HG02024|   -60010252|   0|225.0|
|C1046::HG02025|   -60010252|   1|225.0|
|C1046::HG02026|   -60010252|   0|225.0|
|C1047::HG00731|   -60010252|   0|225.0|
|C1047::HG00732|   -60010252|   1|225.0|
|C1047::HG00733|   -60010252|   0|225.0|
|C1048::HG02024|   -60010252|   0|225.0|
|C1048::HG02025|   -60010252|   1|225.0|
|C1048::HG02026|   -60010252|   0|225.0|
|C1049::HG00731|   -60010252|   0|225.0|
|C1049::HG00732|   -60010252|   1|225.0|
|C1049::HG00733|   -60010252|   0|225.0|
|C1050::HG03006|   -60010252|   0|225.0|
|C1051::HG03642|   -60010252|   0|225.0|
|C1589::HG02922|   -60010252|   2|225.0|
|C1589::HG03006|   -60010252|   0|225.0|
|C1589::HG03052|   -60010252|   2|225.0|
|C1589::HG03642|   -60010252|   0|225.0|
|C1589::NA12878|   -60010252|   1|225.0|
|C1589::NA19017|   -60010252|   1|225.0|
+--------------+------------+----+-----+
mongolol
  • 941
  • 1
  • 13
  • 31

1 Answers1

3

toDF method is executed under SparkSession in and SQLContex in 1.x version. So

spark = SparkSession(sc)
hasattr(rdd, "toDF")

If you are using scala you need to inport import spark.implicits._ where spark is the SparkSession object that you created.

Hope this helps!

koiralo
  • 22,594
  • 6
  • 51
  • 72
  • I've added how I'm initializing my SparkSession to the bottom of my script. Shouldn't this give me access to the `toDF()` method? – mongolol May 05 '17 at 18:22
  • If you are on scala you need to import spark.implicits._ – koiralo May 06 '17 at 11:59
  • I'm using Python. I've included my imports. – mongolol May 07 '17 at 03:12
  • Have you tried vector_rdd.map(lambda x: (x, )).toDF() – koiralo May 07 '17 at 03:20
  • Or you can also from pyspark.sql import Row row = Row("columnName") vector_rdd.map(row).toDF() – koiralo May 07 '17 at 03:21
  • Tried both the suggested methods, received the same error regarding `_get_object_id`. `hasattr(vector_rdd, "toDF")` is correctly returning true. It looks like the problem is with the `toDF()` method itself, just not sure why it's happening... – mongolol May 08 '17 at 17:08
  • stopping my initial SparkSession and starting another resolved the problem. I'll have to look at how my includes impacted Spark - pretty odd error overall. – mongolol May 09 '17 at 03:19
  • That seems strange!! – koiralo May 09 '17 at 03:23
  • 1
    @koiralo you could edit the scala solution and add the following in order to be clearer: "If you are in scala you need to import import spark.implicits._ where spark is the SparkSession". I had trouble understanding that that was the case. – Tom Piaggio Dec 09 '20 at 15:42