10

I'm writing an ETL Spark (2.4) job in Scala reading ;-separated CSV files with a glob pattern on S3. The data is loaded in a DataFrame and contains a column (let's say it is named custom) with a JSON-formatted string (multiple levels of nesting). The goal is to automatically infer the schema from that column so that it can be structured for a write sink on Parquet files back in S3.

This post (How to query JSON data column using Spark DataFrames?) suggests schema_of_json from Spark 2.4 can infer the schema from a JSON-formatted column or string.

Here is what I tried:

val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

df.withColumn(
    "nestedCustom",
    from_json(col("custom"), jsonSchema, Map[String, String]())
)

But the above doesn't work and raise this exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]

Keep in mind I'm filtering out null values on custom for this DataFrame.


EDIT: whole code below.

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
  * RandomName entry point.
  *
  * @author Random author
  */
object RandomName {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder
      .appName("RandomName")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
      .getOrCreate

    import spark.implicits._

    val randomName: RandomName = new RandomName(spark)

    val df: sql.DataFrame  = randomName.read().filter($"custom".isNotNull)
    val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

    df.withColumn(
      "nestedCustom",
      from_json(col("custom"), jsonSchema, Map[String, String]())
    )

    df.show

    spark.stop
  }
}

class RandomName(private val spark: SparkSession) {

  /**
    * Reads CSV files from S3 and creates a sql.DataFrame.
    *
    * @return a sql.DataFrame
    */
  def read(): sql.DataFrame = {
    val tableSchema = StructType(
      Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true),
        StructField("c", DateType, true),
        StructField("custom", StringType, true)
      ))

    spark.read
      .format("csv")
      .option("sep", ";")
      .option("header", "true")
      .option("inferSchema", "true")
      .schema(tableSchema)
      .load("s3://random-bucket/*")
  }
}

And an example of a JSON:

{
  "lvl1":  {
    "lvl2a": {
      "lvl3a":   {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    },
    "lvl2b":   {
      "lvl3a":   {
        "lvl4a": "ramdom_data"
      },
      "lvl3b":  {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    }
  }
}
ngc2359
  • 101
  • 1
  • 4

1 Answers1

1

That's an indicator that custom is not a valid input for schema_of_json

scala> spark.sql("SELECT schema_of_json(struct(1, 2))")
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(named_struct('col1', 1, 'col2', 2))' due to data type mismatch: argument 1 requires string type, however, 'named_struct('col1', 1, 'col2', 2)' is of struct<col1:int,col2:int> type.; line 1 pos 7;
...

You should go back to your data and make sure that custom is indeed a String.

  • So `df.select("custom").distinct.show(20, false)` returns only one output (wanted behavior) that is: `{ "lvl1": {"lvl2a": { "lvl3a": { "lvl4a": "random_data", "lvl4b": "random_data" } }, "lvl2b": { "lvl3a": { "lvl4a": "ramdom_data" }, "lvl3b": { "lvl4a": "random_data", "lvl4b": "random_data" } } } }`. I tried enforcing the type of the column: `val jsonSchema: String = df.select(schema_of_json(col("custom").as[String])).as[String].first` But same error message. And I tried to convert the DataFrame to a Dataset and enforced the type of `custom` to `String` but same error message. :/ – ngc2359 Feb 14 '19 at 13:29
  • 6
    I found a solution (that works) but feels hacky: `val jsonSchema: String = df.select(schema_of_json(df.select(col("custom")).first.getString(0))).as[String].first` Do you think there is a better way to do this by directly adressing the column as advertised by the function signature? – ngc2359 Feb 14 '19 at 14:03
  • @ngc2359, This seems to work... and yes, feels hacky. I wonder what we are doing wrong? – Jesse May 07 '19 at 14:55
  • You are taking the first row as the schema for all that column if the Json schema is dynamic this will eventually not work – Gabo Jul 16 '20 at 22:41
  • Spark 3.0, this one still remains an open issue. – abiratsis Sep 26 '20 at 13:23
  • What I read from another question is that schema_of_json works with string, not col that has a string. I also trying to find more info. – FEST Mar 11 '21 at 12:11