1

Hi I'm working in spark 1.6.3. I have an rdd which has some BigInt scala types in there. How would I convert that to a spark dataframe ? Is it possible to cast the types before creating the dataframe ?

My rdd:

Array[(BigInt, String, String, BigInt, BigInt, BigInt, BigInt, List[String])] = Array((14183197,Browse,3393626f-98e3-4973-8d38-6b2fb17454b5_27331247X28X6839X1506087469573,80161702,8702170626376335,59,527780275219,List(NavigationLevel, Session)), (14183197,Browse,3393626f-98e3-4973-8d38-6b2fb17454b5_27331247X28X6839X1506087469573,80161356,8702171157207449,72,527780278061,List(StartPlay, Action, Session)))

printed out:

(14183197,Browse,3393626f-98e3-4973-8d38-6b2fb17454b5_27331247X28X6839X1506087469573,80161356,8702171157207449,72,527780278061,List(StartPlay, Action, Session))
(14183197,Browse,3393626f-98e3-4973-8d38-6b2fb17454b5_27331247X28X6839X1506087469573,80161702,8702170626376335,59,527780275219,List(NavigationLevel, Session))

I've tired to create a schema object;

  val schema = StructType(Array(
    StructField("trackId", LongType, true),
    StructField("location", StringType, true),
    StructField("listId", StringType, true),
    StructField("videoId", LongType, true),
    StructField("id", LongType, true),
    StructField("sequence", LongType, true),
    StructField("time", LongType, true),
    StructField("type", ArrayType(StringType), true)
  ))

if I try val df = sqlContext.createDataFrame(rdd, schema) I get this error

error: overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[(BigInt, String, String, BigInt, BigInt, BigInt, BigInt, scala.collection.immutable.List[String])], org.apache.spark.sql.types.StructType)

Or if I try val df = sc.parallelize(rdd.toSeq).toDF I get the following error ;

error: value toSeq is not a member of org.apache.spark.rdd.RDD[(BigInt, String, String, BigInt, BigInt, BigInt, BigInt, List[String])]

Any help is appreciated

ukbaz
  • 519
  • 2
  • 14
  • 30

1 Answers1

-1

Schema can be used only with RDD[Row]. Here use reflection:

sqlContext.createDataFrame(rdd)

You also have change BigInt to one of the supported types (BigDecimal?) or use binary encoder for this field.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • 1
    thanks for your comment, I get a `java.lang.UnsupportedOperationException: Schema for type scala.BigInt is not supported` error – ukbaz Oct 13 '17 at 07:45