3

I'm working with a Spark dataframe which could be loading data from one of three different schema versions:

// Original
{ "A": {"B": 1 } }
// Addition "C"
{ "A": {"B": 1 }, "C": 2 }
// Additional "A.D"
{ "A": {"B": 1, "D": 3 }, "C": 2 }

I can process the additional "C" by checking if the schema contains a field "C" and if not adding a new column to the dataframe. However I can't work out how to create a field for the sub-object.

public void evolvingSchema() {
    String versionOne = "{ \"A\": {\"B\": 1 } }";
    String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
    String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";

    process(spark.getContext(), "1", versionOne);
    process(spark.getContext(), "2", versionTwo);
    process(spark.getContext(), "2", versionThree);
}

private static void process(JavaSparkContext sc, String version, String data) {
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read().json(sc.parallelize(Arrays.asList(data)));
    if(!Arrays.asList(df.schema().fieldNames()).contains("C")) {
        df = df.withColumn("C", org.apache.spark.sql.functions.lit(null));
    }
    // Not sure what to put here. The fieldNames does not contain the "A.D"

    try {
        df.select("C").collect();
    } catch(Exception e) {
        System.out.println("Failed to C for " + version);
    }
    try {
        df.select("A.D").collect();
    } catch(Exception e) {
        System.out.println("Failed to A.D for " + version);
    }
}
Community
  • 1
  • 1
Michael Lloyd Lee mlk
  • 14,561
  • 3
  • 44
  • 81

2 Answers2

7

JSON sources are not very well suited for data with evolving schema (how about Avro or Parquet instead) but the simple solution is to use the same schema for all sources and make new fields optional / nullable:

import org.apache.spark.sql.types.{StructType, StructField, LongType}

val schema = StructType(Seq(
  StructField("A", StructType(Seq(
    StructField("B", LongType, true), 
    StructField("D", LongType, true)
  )), true),
  StructField("C", LongType, true)))

You can pass schema like this to DataFrameReader:

val rddV1 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 } }"))
val df1 = sqlContext.read.schema(schema).json(rddV1)

val rddV2 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 }, \"C\": 2 }"))
val df2 = sqlContext.read.schema(schema).json(rddV2)

val rddV3 = sc.parallelize(Seq("{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }"))
val df3 = sqlContext.read.schema(schema).json(rddV3)

and you'll get a consistent structure independent of a variant:

require(df1.schema == df2.schema && df2.schema == df3.schema)

with missing columns automatically set to null:

df1.printSchema
// root
//  |-- A: struct (nullable = true)
//  |    |-- B: long (nullable = true)
//  |    |-- D: long (nullable = true)
//  |-- C: long (nullable = true)

df1.show
// +--------+----+
// |       A|   C|
// +--------+----+
// |[1,null]|null|
// +--------+----+

df2.show
// +--------+---+
// |       A|  C|
// +--------+---+
// |[1,null]|  2|
// +--------+---+

df3.show
// +-----+---+
// |    A|  C|
// +-----+---+
// |[1,3]|  2|
// +-----+---+

Note:

This solutions is data source dependent. It may or may not work with other sources, or even result in malformed records.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Why do you say that JSON is not best suited for evolving schemas? – Michael Lloyd Lee mlk Nov 24 '15 at 09:57
  • 1
    @mlk our main issue with evolving schemas and JSON is that the various JSON clients may do unexpected things like presenting an empty section that you expect to be an array as an empty string instead (i.e. ""). This can really mess up your schema management... I imagine zero323 has similar concerns – Ewan Leith Nov 24 '15 at 10:18
  • OK thanks Ewan. Any easy way to go from a avro Schema to a `StructType`? – Michael Lloyd Lee mlk Nov 24 '15 at 12:44
  • 1
    On top of Ewan's comment JSON is neither self-describing nor supports schemas. You can of course use or create custom hypermedia formats for JSON documents but there are not a part of the semantics. It becomes particularly annoying when we deal with JSONL. Without gong trough a whole file it is not possible to infer schema. Moreover if data is malformed it is something we hit on a runtime. – zero323 Nov 24 '15 at 14:56
  • 1
    As we speak, this won't work with spark 2.1.0 if you read two differently partitioned parquet folders. Column order will be messed up even if you specify schema before reading – Michel Lemay Jun 08 '17 at 11:56
  • @MichelLemay This answer was never intended to be used with Parquet. For this you should rather enable `spark.sql.parquet.mergeSchema`. – zero323 Jun 08 '17 at 12:19
  • You are right that this is somewhat off-topic. I thought it might be useful anyways since I stumbled upon this post while looking for a solution to my issue. Speaking of mergeSchema, it will do fine when you read a single dataframe but won't help when you need to union multiple of them. – Michel Lemay Jun 12 '17 at 13:48
  • @MichelLemay Don't get me wrong - this is a very good point. Regarding the general problem there is always the hard way - find the diff between schemas and fill missing columns with nulls. It gets tricky with complex schemas, but you can creatively use some JSON diffing tools for that. – zero323 Jun 12 '17 at 13:55
  • @MichelLemay `parquet` method can take multiple paths - so you should be able to use schema merging for this, although I haven't tried it in practice. – zero323 Jun 12 '17 at 13:58
  • 1
    For reference, the way I deal with evolving schemas is like that: I exposed and used private Structype.mergeSchema to manually merge schemas from different sources (read from ParquetFileFormat.mergeSchemasInParallel with a subset of the files), then I read the dataframes giving explicit schema and finally reorder columns using the select(col: _*) trick. And only then, I perform the union. – Michel Lemay Jun 13 '17 at 18:46
3

zero323 has answer the question, but in Scala. This is the same thing but in Java.

public void evolvingSchema() {
    String versionOne = "{ \"A\": {\"B\": 1 } }";
    String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
    String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";

    process(spark.getContext(), "1", versionOne);
    process(spark.getContext(), "2", versionTwo);
    process(spark.getContext(), "2", versionThree);
}

private static void process(JavaSparkContext sc, String version, String data) {
    StructType schema = DataTypes.createStructType(Arrays.asList(
            DataTypes.createStructField("A",
                    DataTypes.createStructType(Arrays.asList(
                            DataTypes.createStructField("B", DataTypes.LongType, true),
                    DataTypes.createStructField("D", DataTypes.LongType, true))), true),
            DataTypes.createStructField("C", DataTypes.LongType, true)));

    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read().schema(schema).json(sc.parallelize(Arrays.asList(data)));

    try {
        df.select("C").collect();
    } catch(Exception e) {
        System.out.println("Failed to C for " + version);
    }
    try {
        df.select("A.D").collect();
    } catch(Exception e) {
        System.out.println("Failed to A.D for " + version);
    }
}
Community
  • 1
  • 1
Michael Lloyd Lee mlk
  • 14,561
  • 3
  • 44
  • 81