4

I have serialized a Spark ML Pipeline model that consists of a number of TransformerS (org.apache.spark.ml.Transformer) and several Logistic Regression learners (org.apache.spark.ml.classification.LogisticRegression). It all works fine on my Windows machine where I created the model. I serialized the model to disk using java.io.ObjectOutputStream and read it back in using java.io.ObjectInputStream.

It all works fine via sbt and my corresponding unit tests. However, when I assemble my code into a jar and try to run the same code in the Spark shell on my server, I get a ClassNotFoundException for my (first) transformer. The stack trace is included below.

I am using fork := true in my build.sbt file as I thought this might be related to SparkSQL MissingRequirementError when registering table, but that did not seem to help.

It also makes no difference whether I try to load the model from the jar file or directly from disk (as shown below).

How to deserialize a Pipeline?

Welcome to Ubuntu 14.04.2 LTS (GNU/Linux 3.13.0-49-generic x86_64)

spark-shell -J-Xmx4g --master local[2] --jars myJar.jar
Spark assembly has been built with Hive, including Datanucleus jars on classpath

Welcome to
   ____              __
  / __/__  ___ _____/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.0
   /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> import com.myCompany.spark.classifier._
import com.myCompany.spark.classifier._
scala> import java.io._
import java.io._

scala> val l = new LabelTransformer(2)
l: com.myCompany.spark.classifier.LabelTransformer = com.myCompany.spark.classifier.LabelTransformer@61d24347

scala> val l2 = new ObjectInputStream(new FileInputStream("log-reg_2_10.model")).readObject
java.lang.ClassNotFoundException: com.myCompany.spark.classifier.LabelTransformer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
    at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
    at scala.collection.mutable.HashTable$class.init(HashTable.scala:105)
    at scala.collection.mutable.HashMap.init(HashMap.scala:39)
    at scala.collection.mutable.HashMap.readObject(HashMap.scala:142)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
    at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
    at scala.collection.mutable.HashTable$class.init(HashTable.scala:105)
    at scala.collection.mutable.HashMap.init(HashMap.scala:39)
    at scala.collection.mutable.HashMap.readObject(HashMap.scala:142)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    at $iwC$$iwC$$iwC.<init>(<console>:46)
    at $iwC$$iwC.<init>(<console>:48)
    at $iwC.<init>(<console>:50)
    at <init>(<console>:52)
    at .<init>(<console>:56)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Community
  • 1
  • 1
Tim Malt
  • 91
  • 6

2 Answers2

1

Since 1.6.0 you can use (experimental) Pipeline.read.load(path) to load a Pipeline object from path.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
1

Experimental it is indeed. Since Pipeline(s) can contain all sorts of things, each one needs to be readable/writable for the Pipeline save/load to work. See for example RandomForests is not there yet and won't be until Spark v2.0.0 https://issues.apache.org/jira/browse/SPARK-13784

Edi Bice
  • 566
  • 6
  • 18