0

I am trying to use JsonSchema to validate rows in an RDD, in order to filter out invalid rows.

Here is my code:

import com.github.fge.jsonschema.main.{JsonSchema, JsonSchemaFactory}
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.spark.sql.types.StructType

def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault

def stringToJsonSchema(str: String): Try[JsonSchema] = {
  stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}

def stringToJson(str: String): Try[JsonNode] = {
  val mapper = new ObjectMapper
  Try({
    val json = mapper.readTree(str)
    json
  })
}

def validateJson(data: JsonNode, jsonSchema: JsonSchema): Boolean = {
  val report = jsonSchema.validateUnchecked(data, true)
  report.isSuccess
}

val schemaSource: String = ...
val jsonSchema: JsonSchema = stringToJsonSchema(schemaSource).get
val df = spark.read
  .textFile("path/to/data.json")
  .filter(str => {
    stringToJson(str)
      .map(validateJson(_, jsonSchema))
      .getOrElse(false)
  })

However, I'm getting an error because JsonSchema is not serializable:

Cause: org.apache.spark.SparkException: Task not serializable
[info]   Cause: java.io.NotSerializableException: com.github.fge.jsonschema.main.JsonSchema
[info] Serialization stack:
[info]  - object not serializable (class: com.github.fge.jsonschema.main.JsonSchema, value: com.github.fge.jsonschema.main.JsonSchema@33d22225)

I have read these threads in search of a solution:

And also some others.

As far as I understand - spark needs to serialize all the operations that I want to perform on the RDD, in order to send them to the worker nodes. But JsonSchema is not serializable, so it fails.

I have tried this solution:

  def genMapper[A, B](f: A => B): A => B = {
    val locker = com.twitter.chill.MeatLocker(f)
    x => locker.get.apply(x)
  }

  private class JsonSchemaValidator(jsonSchema: JsonSchema) extends (String => Boolean) {
    def apply(str: String): Boolean =
      stringToJson(str)
        .map(validateJson(_, jsonSchema))
        .getOrElse(false)
  }

val validator: String => Boolean = genMapper(new JsonSchemaValidator(jsonSchema))

df.filter(validator)

Because someone said it is supposed to be able to serialize anything. But I got this error:

java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: jdk.internal.misc.InnocuousThread / executeTests 11s
...
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile boolean jdk.internal.misc.InnocuousThread.hasRun accessible: module java.base does not "opens jdk.internal.misc" to unnamed module @7876b3b3

So I tried to solve it by adding some java options:

ThisBuild / javaOptions ++= Seq(
    "--add-opens", "java.base/jdk.internal.misc=ALL-UNNAMED",
    "--add-opens", "java.base/jdk.internal.ref=ALL-UNNAMED",
    "--add-opens", "java.base/jdk.internal.loader=ALL-UNNAMED",
)

But this still throws an error:

Caused by: java.lang.NoSuchMethodException: jdk.xml.internal.SecuritySupport$$Lambda$1262/0x0000000800c16840.writeReplace()
    at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)

So I took a step back, and tried with broadcast:

val brdJsonSchema: Broadcast[JsonSchema] = spark.sparkContext.broadcast(jsonSchema)
df.filter(str => {
    stringToJson(str)
      .map(validateJson(_, brdJsonSchema.value))
      .getOrElse(false)
  })

But that gives a serialization error too.

Also tried it with MeatLocker:

val brdSerJsonSchema: Broadcast[MeatLocker[JsonSchema]] = spark.sparkContext.broadcast(MeatLocker(jsonSchema))
df.filter(str => {
    stringToJson(str)
      .map(validateJson(_, brdSerJsonSchema.value.get))
      .getOrElse(false)
  })

But then I'm getting the same error as the genMapper solution above, and adding the javaOptions does not help.

I did find a workaround, but I don't like it because it feels hacky: I can parse the string source of the json schema inside the filter function:

val schemaSource: String = ...
df.filter(str => {
  val jsonSchema = stringToJsonSchema(schemaSource).get
  stringToJson(str)
    .map(validateJson(_, jsonSchema))
    .getOrElse(false)
})

But that means that I am parsing the JsonSchema for every row of the DF, and that's a waste of CPU and memory.

So instead I can use a cache-wrapper, which also feels hacky:

  class CacheWrapper[T] {
    private lazy val cache = mutable.Map.empty[String, T]

    def getCacheOrElse(cacheKey: String, getter: () => T): T = {
      cache.getOrElse(cacheKey, {
        cache.put(cacheKey, getter())
        cache(cacheKey)
      })
    }
  }

val schemaSource: String = ...
@transient lazy val jsonSchemaCache: CacheWrapper[JsonSchema] = new CacheWrapper[JsonSchema]
df.filter(str => {
  val jsonSchema = jsonSchemaCache.getOrElse(schemaSource.hashCode.toString, () => stringToJsonSchema(schemaSource).get)
  stringToJson(str)
    .map(validateJson(_, jsonSchema))
    .getOrElse(false)
})

This will have the effect of parsing the JsonSchema only once (when we reach the first row). All the other rows will get it from the cache.

But again - this feels hacky.

Is there another way to do this?

All I want is a way to pass the schemaSource string to all the spark worker nodes, have each worker parse it into a JsonSchema only once, and then use that JsonSchema object to filter the DF. That sounds to me like it should be easy, but I can't find a way to do it.

Malki
  • 2,335
  • 8
  • 31
  • 61
  • 1
    Just a side note, the com.github.fge JSON schema implementation is no longer maintained and quite old. Maybe you want to consider selecting a different Java validator from here https://json-schema.org/implementations.html#validators ? – Clemens Mar 10 '22 at 12:28
  • @Clemens thanks, I'll check it out – Malki Mar 10 '22 at 14:23
  • Update - I switched to using `com.networknt.json-schema-validator` which seems to be more modern, and it has the exact same serialization problems, and the solution below works for it too :) – Malki Mar 10 '22 at 14:59

1 Answers1

0

OK so a coworker helped me find a solution.

Sources:

Code:

private class JsonSchemaValidator(schemaSource: String) extends (String => Boolean) with Serializable {
    @transient lazy val jsonSchema: JsonSchema = JsonSchemaDFParser.stringToJsonSchema(schemaSource).get

    def apply(str: String): Boolean = 
      stringToJson(str)
        .map(validateJson(_, jsonSchema))
        .getOrElse(false)
  }

val validator: String => Boolean = new JsonSchemaValidator(schemaSource)

df.filter(validator)

@transient has the effect of excluding the item from the object when it is serialised. lazy means the field will be constructed again when first accessed on each of the executors.

The wrapping class has to extend Serializable for this to work.

NOTE: This solution works. It does what I want it to do. BUT - it's still strange to me that it had to be so hard to do this. I might be missing some feature of Spark, but the fact I needed to find this very special syntax to do such a simple thing feels to me like a design flaw.

Malki
  • 2,335
  • 8
  • 31
  • 61