2

Spark is great at parsing JSON into a nested StructType on the initial read from disk, but what if I already have a String column containing JSON in a Dataset, and I want to map it into a Dataset with a StructType column, with schema inference that takes the whole dataset into account, while fully leveraging parallel state and avoiding reducing actions?

I know of functions schema_of_json and from_json, which are apparently intended to be used together to accomplish this, or something similar, but I'm having trouble finding actual working code examples, especially in Java.

I'll accept any answer that supplies a Java example and satisfies the goals of full schema inference and full non-reduced parallel operation. Or if that's not possible, the closest workaround.

I'm currently using Spark 2.4.0.

I've studied the following related question:

Implicit schema discovery on a JSON-formatted Spark DataFrame column

This question is similar to mine, though for Scala. There is no accepted answer. The OP announces in a comment that they have found a "hacky" solution to getting from_schema to work. The problem with the solution, beyond "hackiness", is that it infers the schema from only the first row of the dataframe, so the types are likely to be too tightly constrained:

val jsonSchema: String = df.select(schema_of_json(df.select(col("custom")).first.getString(0))).as[String].first

EDIT: I tried the solution indicated here as discussed in comments below. Here's the implementation:

    SparkSession spark = SparkSession
            .builder()
            .appName("example")
            .master("local[*]")
            .getOrCreate();

    Dataset<Row> df = spark.read().text(conf.getSourcePath());

    df.cache();

    String schema = df.select(schema_of_json(col("value")))
          .as(Encoders.STRING())
          .first();
    df.withColumn("parsedJson", from_json(col("value"), schema, new HashMap<String, String>()))
            .drop("value")
            .write()
            .mode("append")
            .parquet(conf.getDestinationPath());

From this code I obtained an error:

AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [schemaofjson(value#0) AS schemaOfjson(value)#20]
+- Relation[value#0] text

This error led me to the following Spark pull request: https://github.com/apache/spark/pull/22775

which seems to indicate that schema_of_json was never meant to apply to a whole table in order to do schema inference on the entire thing, but instead to infer a schema from a single, literal JSON sample passed in directly using lit("some json"). In that case, I don't know that Spark offers any solution for full schema inference from JSON on an entire table. Unless someone here can correct my read of this pull request or offer an alternative approach??

nclark
  • 1,022
  • 1
  • 11
  • 16
  • sorry, can someone please enlighten me on the downvote? Thanks. – nclark May 23 '19 at 13:36
  • Have you seen [Working](https://stackoverflow.com/q/34069282/10958683) [Examples](https://github.com/apache/spark/blob/79551f558dafed41177b605b0436e9340edf5712/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala#L401) in Scala? These are almost identical to Java counterpart. As of _avoiding reducing actions_ - schema inference is almost by definition a "reducing action". – user10938362 May 23 '19 at 13:55
  • 1
    @user10958683 Thanks for the links. I had seen the first of those two, but didn't focus as much on it because it was linked from the one I have referenced above, which I found first. It looks more promising as it appears to use `schema_of_json` properly. I had no trouble converting it to Java. Will try it. – nclark May 23 '19 at 14:23
  • @user10958683 yes, schema inference is obviously reductive in nature. I was thinking about the coupled nature of spark.read.json as allowing for the schema to be incrementally inferred more efficiently behind the scenes. Not sure how that would be implemented on the cluster. Obviously the driver has to gather results from all the workers... – nclark May 23 '19 at 14:26
  • @user10958683 the first of your examples above leads to the error I've now documented in the question. The second is not applicable, as it uses `schema_of_json` with a literal JSON string instead of a column name, so cannot be applied to an entire table. – nclark May 23 '19 at 16:21

1 Answers1

1

There's actually a very simple solution to this using DataFrameReader.json(Dataset<String>), don't know why it hasn't come up in my searches:

    Dataset<String> ds = ...;

    spark.read()
        .json(ds)
        .write()
        .mode("append")
        .parquet(conf.getDestinationPath());

If you have multiple columns in the source Dataset, obviously you can select just the one to operate on. And the content type has to be String (not Row for example).

nclark
  • 1,022
  • 1
  • 11
  • 16