2

I am having a RDD[Map[String,Any]] and i am trying to convert it into a Dataframe. I am not having a schema that i can specify the Dataframe.

I tried to do a rdd.toDF but that didn't help. It was throwing up an error as follows.

Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:512)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.SQLImplicits.newMapEncoder(SQLImplicits.scala:172)

Sample Input

val data: RDD[Map[String, Any]] = appContext.sc.parallelize(List(
      Map("A" -> "B"),              //Value could be String
      Map("C" -> 123),              //Value could be Numerical(Long, Double, Int etc)
      Map("D" -> Map("E" -> "F")),  // Could be another Map
      Map("G" -> List("H" , "I")),  // List of values
      Map("J" -> List(              // List Of Maps
        Map("K" -> "L"),
        Map("M" -> "N")
      ))
    ))

I was able to make it into a dataframe by doing the following(JsonUtils is wrapper around Jackson), but is giving me performance issues.

def convert(data: RDD[Map[String, Any]]): DataFrame = {
        sparkSession.read.json(data.map(each => JsonUtils.toJson(each)))
      }

Is there any other method that we could use to get this achieved which could give better performance? Any suggestions are much appreciated!!

Update: I am not using the DataFrame for any processing as such. I just want to write the output in 3 different format and converting to DataFrame was the best method i could find to get a consistent output. Any other suggestion to achieve this without actually converting to Dataframe would also be really helpful.

df.write.avro("/path/to/avroFile")
df.write.parquet("/path/to/parquetFile")
df.write.json("/path/to/jsonFile")
tpuli
  • 405
  • 1
  • 5
  • 15
  • can you update your question with some sample input and output data? – Jayadeep Jayaraman Nov 14 '19 at 03:22
  • Added sample inputs – tpuli Nov 14 '19 at 06:58
  • Converting and using a single column type (not `Any`) would give the best performance. I guess that is not an option here? Maybe this would give an idea of what you could test otherwise: https://stackoverflow.com/questions/41504976/dataframe-to-dataset-which-has-type-any – Shaido Nov 14 '19 at 07:03
  • Can I ask why you want to use dataframes? – Oli Nov 14 '19 at 10:33
  • I am only converting it into a JSON so that i can write it out in 3 different output formats, ie AVRO, PARQUET and JSON. I am not really using Dataframe for any processing as such. 1. df.write.avro("/path/toFile") 2. df.write.parquet("/path/toFile") 3. df.write.json("/path/toFile") Updated the question to reflect the use case as well – tpuli Nov 14 '19 at 16:15

1 Answers1

0

You won't be able to convert a RDD containing Any to a Dataframe.

However, maybe you can separate your initial RDD (maybe this will be useful to you), for example:

One RDD with only Map(String, String), another one with Map(String, Int), etc.

After you have your RDDs you can convert them into a DF using the toDF method and finally join them, so in the end you'll have a Dataframe that has:

+-----+-------------+----------+-----------------+
| Key | StringValue | IntValue |    MapValue     |
+-----+-------------+----------+-----------------+
| A   | SomeString  |      123 | Map("A" -> "B") |
| B   | SomeString  |      456 | Map("B" -> "C") |
+-----+-------------+----------+-----------------+