I am trying to use JsonSchema
to validate rows in an RDD, in order to filter out invalid rows.
Here is my code:
import com.github.fge.jsonschema.main.{JsonSchema, JsonSchemaFactory}
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.spark.sql.types.StructType
def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
def stringToJsonSchema(str: String): Try[JsonSchema] = {
stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}
def stringToJson(str: String): Try[JsonNode] = {
val mapper = new ObjectMapper
Try({
val json = mapper.readTree(str)
json
})
}
def validateJson(data: JsonNode, jsonSchema: JsonSchema): Boolean = {
val report = jsonSchema.validateUnchecked(data, true)
report.isSuccess
}
val schemaSource: String = ...
val jsonSchema: JsonSchema = stringToJsonSchema(schemaSource).get
val df = spark.read
.textFile("path/to/data.json")
.filter(str => {
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
However, I'm getting an error because JsonSchema
is not serializable:
Cause: org.apache.spark.SparkException: Task not serializable
[info] Cause: java.io.NotSerializableException: com.github.fge.jsonschema.main.JsonSchema
[info] Serialization stack:
[info] - object not serializable (class: com.github.fge.jsonschema.main.JsonSchema, value: com.github.fge.jsonschema.main.JsonSchema@33d22225)
I have read these threads in search of a solution:
- Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
- Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?
- https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
- https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/
And also some others.
As far as I understand - spark needs to serialize all the operations that I want to perform on the RDD, in order to send them to the worker nodes. But JsonSchema
is not serializable, so it fails.
I have tried this solution:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
private class JsonSchemaValidator(jsonSchema: JsonSchema) extends (String => Boolean) {
def apply(str: String): Boolean =
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
}
val validator: String => Boolean = genMapper(new JsonSchemaValidator(jsonSchema))
df.filter(validator)
Because someone said it is supposed to be able to serialize anything. But I got this error:
java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: jdk.internal.misc.InnocuousThread / executeTests 11s
...
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile boolean jdk.internal.misc.InnocuousThread.hasRun accessible: module java.base does not "opens jdk.internal.misc" to unnamed module @7876b3b3
So I tried to solve it by adding some java options:
ThisBuild / javaOptions ++= Seq(
"--add-opens", "java.base/jdk.internal.misc=ALL-UNNAMED",
"--add-opens", "java.base/jdk.internal.ref=ALL-UNNAMED",
"--add-opens", "java.base/jdk.internal.loader=ALL-UNNAMED",
)
But this still throws an error:
Caused by: java.lang.NoSuchMethodException: jdk.xml.internal.SecuritySupport$$Lambda$1262/0x0000000800c16840.writeReplace()
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
So I took a step back, and tried with broadcast
:
val brdJsonSchema: Broadcast[JsonSchema] = spark.sparkContext.broadcast(jsonSchema)
df.filter(str => {
stringToJson(str)
.map(validateJson(_, brdJsonSchema.value))
.getOrElse(false)
})
But that gives a serialization error too.
Also tried it with MeatLocker
:
val brdSerJsonSchema: Broadcast[MeatLocker[JsonSchema]] = spark.sparkContext.broadcast(MeatLocker(jsonSchema))
df.filter(str => {
stringToJson(str)
.map(validateJson(_, brdSerJsonSchema.value.get))
.getOrElse(false)
})
But then I'm getting the same error as the genMapper
solution above, and adding the javaOptions
does not help.
I did find a workaround, but I don't like it because it feels hacky:
I can parse the string source of the json schema inside the filter
function:
val schemaSource: String = ...
df.filter(str => {
val jsonSchema = stringToJsonSchema(schemaSource).get
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
But that means that I am parsing the JsonSchema
for every row of the DF, and that's a waste of CPU and memory.
So instead I can use a cache-wrapper, which also feels hacky:
class CacheWrapper[T] {
private lazy val cache = mutable.Map.empty[String, T]
def getCacheOrElse(cacheKey: String, getter: () => T): T = {
cache.getOrElse(cacheKey, {
cache.put(cacheKey, getter())
cache(cacheKey)
})
}
}
val schemaSource: String = ...
@transient lazy val jsonSchemaCache: CacheWrapper[JsonSchema] = new CacheWrapper[JsonSchema]
df.filter(str => {
val jsonSchema = jsonSchemaCache.getOrElse(schemaSource.hashCode.toString, () => stringToJsonSchema(schemaSource).get)
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
This will have the effect of parsing the JsonSchema
only once (when we reach the first row). All the other rows will get it from the cache.
But again - this feels hacky.
Is there another way to do this?
All I want is a way to pass the schemaSource
string to all the spark worker nodes, have each worker parse it into a JsonSchema
only once, and then use that JsonSchema
object to filter the DF. That sounds to me like it should be easy, but I can't find a way to do it.