0

I am new to Spark and am trying to run it on sbt console. I have a proper build.sbt (since the code runs fine on Intellij) and am able to import the packages in the code. The issue is that while running on sbt console (terminal), the executor just keeps running and is not able to finish the task.

Code

import org.apache.spark._
val sc = new SparkContext("local[1]", "SimpleProg")
val nums = sc.parallelize(List(1, 2, 3, 4));
println(nums.reduce((a, b) => a - b))

Here are outputs from different consoles and methods (using -Xprint:typer).

In sbt console (in which the executor keeps running)

[[syntax trees at end of                     typer]] // <console>
package $line2 {
  object $read extends scala.AnyRef {
    def <init>(): type = {
      $read.super.<init>();
      ()
    };
    object $iw extends scala.AnyRef {
      def <init>(): type = {
        $iw.super.<init>();
        ()
      };
      object $iw extends scala.AnyRef {
        def <init>(): type = {
          $iw.super.<init>();
          ()
        };
        import org.apache.spark._;
        private[this] val sc: org.apache.spark.SparkContext = new org.apache.spark.SparkContext("local[1]", "SimpleProg", spark.this.SparkContext.<init>$default$3, spark.this.SparkContext.<init>$default$4, spark.this.SparkContext.<init>$default$5);
        <stable> <accessor> def sc: org.apache.spark.SparkContext = $iw.this.sc;
        private[this] val nums: org.apache.spark.rdd.RDD[Int] = $iw.this.sc.parallelize[Int](scala.collection.immutable.List.apply[Int](1, 2, 3, 4), $iw.this.sc.parallelize$default$2[Nothing])((ClassTag.Int: scala.reflect.ClassTag[Int]));
        <stable> <accessor> def nums: org.apache.spark.rdd.RDD[Int] = $iw.this.nums
      }
    };
    private[this] val INSTANCE: $line2.$read.type = this;
    <stable> <accessor> def INSTANCE: $line2.$read.type = $read.this.INSTANCE
  }
}

[[syntax trees at end of                     typer]] // <console>
package $line2 {
  object $eval extends scala.AnyRef {
    def <init>(): $line2.$eval.type = {
      $eval.super.<init>();
      ()
    };
    <stable> <accessor> lazy val $result: org.apache.spark.rdd.RDD[Int] = $line2.$read.INSTANCE.$iw.$iw.nums;
    <stable> <accessor> lazy val $print: String = {
      $line2.$read.INSTANCE.$iw.$iw;
      val sb: StringBuilder = new scala.`package`.StringBuilder();
      sb.append("import org.apache.spark._\n");
      sb.append("\u001B[1m\u001B[34msc\u001B[0m: \u001B[1m\u001B[32morg.apache.spark.SparkContext\u001B[0m = ".+(scala.runtime.ScalaRunTime.replStringOf($line2.$read.INSTANCE.$iw.$iw.sc, 1000)));
      sb.append("\u001B[1m\u001B[34mnums\u001B[0m: \u001B[1m\u001B[32morg.apache.spark.rdd.RDD[Int]\u001B[0m = ".+(scala.runtime.ScalaRunTime.replStringOf($line2.$read.INSTANCE.$iw.$iw.nums, 1000)));
      sb.toString()
    }
  }
}

In spark-shell (here code runs fine)

[[syntax trees at end of                     typer]] // <console>
package $line15 {
  sealed class $read extends AnyRef with java.io.Serializable {
    def <init>(): $line15.$read = {
      $read.super.<init>();
      ()
    };
    sealed class $iw extends AnyRef with java.io.Serializable {
      def <init>(): $iw = {
        $iw.super.<init>();
        ()
      };
      private[this] val $line3$read: $line3.$read.type = $line3.$read.INSTANCE;
      <stable> <accessor> def $line3$read: $line3.$read.type = $iw.this.$line3$read;
      import $iw.this.$line3$read.$iw.$iw.spark;
      import $iw.this.$line3$read.$iw.$iw.sc;
      sealed class $iw extends AnyRef with java.io.Serializable {
        def <init>(): $iw = {
          $iw.super.<init>();
          ()
        };
        import org.apache.spark.SparkContext._;
        sealed class $iw extends AnyRef with java.io.Serializable {
          def <init>(): $iw = {
            $iw.super.<init>();
            ()
          };
          sealed class $iw extends AnyRef with java.io.Serializable {
            def <init>(): $iw = {
              $iw.super.<init>();
              ()
            };
            import $iw.this.$line3$read.$iw.$iw.spark.implicits._;
            sealed class $iw extends AnyRef with java.io.Serializable {
              def <init>(): $iw = {
                $iw.super.<init>();
                ()
              };
              import $iw.this.$line3$read.$iw.$iw.spark.sql;
              sealed class $iw extends AnyRef with java.io.Serializable {
                def <init>(): $iw = {
                  $iw.super.<init>();
                  ()
                };
                import org.apache.spark.sql.functions._;
                sealed class $iw extends AnyRef with java.io.Serializable {
                  def <init>(): $iw = {
                    $iw.super.<init>();
                    ()
                  };
                  sealed class $iw extends AnyRef with java.io.Serializable {
                    def <init>(): $iw = {
                      $iw.super.<init>();
                      ()
                    };
                    private[this] val nums: org.apache.spark.rdd.RDD[Int] = $iw.this.$line3$read.$iw.$iw.sc.parallelize[Int](scala.collection.immutable.List.apply[Int](1, 2, 3, 4), $iw.this.$line3$read.$iw.$iw.sc.parallelize$default$2[Nothing])((ClassTag.Int: scala.reflect.ClassTag[Int]));
                    <stable> <accessor> def nums: org.apache.spark.rdd.RDD[Int] = $iw.this.nums
                  };
                  private[this] val $iw: $iw = new $iw.this.$iw();
                  <stable> <accessor> def $iw: $iw = $iw.this.$iw
                };
                private[this] val $iw: $iw = new $iw.this.$iw();
                <stable> <accessor> def $iw: $iw = $iw.this.$iw
              };
              private[this] val $iw: $iw = new $iw.this.$iw();
              <stable> <accessor> def $iw: $iw = $iw.this.$iw
            };
            private[this] val $iw: $iw = new $iw.this.$iw();
            <stable> <accessor> def $iw: $iw = $iw.this.$iw
          };
          private[this] val $iw: $iw = new $iw.this.$iw();
          <stable> <accessor> def $iw: $iw = $iw.this.$iw
        };
        private[this] val $iw: $iw = new $iw.this.$iw();
        <stable> <accessor> def $iw: $iw = $iw.this.$iw
      };
      private[this] val $iw: $iw = new $iw.this.$iw();
      <stable> <accessor> def $iw: $iw = $iw.this.$iw
    };
    private[this] val $iw: $iw = new $read.this.$iw();
    <stable> <accessor> def $iw: $iw = $read.this.$iw
  };
  object $read extends scala.AnyRef with Serializable {
    def <init>(): type = {
      $read.super.<init>();
      ()
    };
    private[this] val INSTANCE: $line15.$read = new $read();
    <stable> <accessor> def INSTANCE: $line15.$read = $read.this.INSTANCE;
    <synthetic> private def readResolve(): Object = $line15.$read
  }
}

In a function(named temp) in sbt console (this also works)

[[syntax trees at end of                     typer]] // <console>
package $line7 {
  object $read extends scala.AnyRef {
    def <init>(): type = {
      $read.super.<init>();
      ()
    };
    object $iw extends scala.AnyRef {
      def <init>(): type = {
        $iw.super.<init>();
        ()
      };
      object $iw extends scala.AnyRef {
        def <init>(): type = {
          $iw.super.<init>();
          ()
        };
        import org.apache.spark._;
        object $iw extends scala.AnyRef {
          def <init>(): type = {
            $iw.super.<init>();
            ()
          };
          import $line6.$read.INSTANCE.$iw.$iw.$iw.$iw.temp;
          object $iw extends scala.AnyRef {
            def <init>(): type = {
              $iw.super.<init>();
              ()
            };
            private[this] val res1: Unit = $line6.$read.INSTANCE.$iw.$iw.$iw.$iw.temp();
            <stable> <accessor> def res1: Unit = $iw.this.res1
          }
        }
      }
    };
    private[this] val INSTANCE: $line7.$read.type = this;
    <stable> <accessor> def INSTANCE: $line7.$read.type = $read.this.INSTANCE
  }
}

[[syntax trees at end of                     typer]] // <console>
package $line7 {
  object $eval extends scala.AnyRef {
    def <init>(): $line7.$eval.type = {
      $eval.super.<init>();
      ()
    };
    <stable> <accessor> lazy val $result: Unit = $line7.$read.INSTANCE.$iw.$iw.$iw.$iw.res1;
    <stable> <accessor> lazy val $print: String = {
      $line7.$read.INSTANCE.$iw.$iw.$iw.$iw;
      ""
    }
  }
}
Dhruv
  • 117
  • 11
  • 1
    I guess `NoClassDefFoundError` here is just because of stopping. You can reproduce the behavior in ordinary Scala code if you keep `reduce` in an object outside the `main` method https://gist.github.com/DmytroMitin/4e9088a1c0df8c1ba23c953922c8cb81 If you move `reduce` into `main` then everything is working https://gist.github.com/DmytroMitin/888f7274b53b74dcc3dabdef0044c835 Also if you extend `App` then everything is working https://gist.github.com/DmytroMitin/38201a56b3add63accc61047dbffd2e0 So in the 1st case something is blocking – Dmytro Mitin Apr 18 '23 at 01:07
  • asked another [question](https://stackoverflow.com/questions/76047885/does-sparkcontext-have-to-execute-in-main-only) for this – Dhruv Apr 18 '23 at 18:17
  • Here is some explanation how spark shell or scala repl compiles and runs every single line of code: https://stackoverflow.com/questions/50321771/spark-adds-hidden-parameter-to-constructor-of-a-scala-class https://stackoverflow.com/questions/73277864/why-spark-shell-throws-nosuchmethodexception-while-calling-newinstance-via-refle – Dmytro Mitin Apr 18 '23 at 21:19
  • Putting the code inside any method and then calling the method works. I don't understand why. Is there anything similar to `-Xprint:typer` that could be written along `sbt console`? – Dhruv Apr 23 '23 at 14:56
  • Try to add `scalacOptions += "-Xprint:typer"` to `build.sbt` and run `sbt console` – Dmytro Mitin Apr 23 '23 at 17:47
  • I've edited the question and put the output. Please have a look. I also reported it as a [bug on github](https://github.com/sbt/sbt/issues/7219) – Dhruv Apr 23 '23 at 18:40
  • I don't think that this is a sbt bug. It's some Spark-specific behavior. As we know the behavior can be reproduced in ordinary Scala code, without repls (sbt console, Scala repl, spark shell etc.). And it can be reproduced in ordinary Scala code from command line, without sbt at all. https://gist.github.com/DmytroMitin/62f92bb15550d689c50276d90103c16d https://gist.github.com/DmytroMitin/ff0b07fc93f1b3674723db1247a1467f So it's Spark, not sbt. – Dmytro Mitin Apr 24 '23 at 01:51
  • A couple of Eugene Yokota's quotes from https://github.com/sbt/sbt/issues/7219: *"Likely this is related to Spark not closing some threads." "Allowing console task to fork likely would solve these type of issues."* Actually, adding `fork := true` to `build.sbt` doesn't change the behavior (in `run` task). – Dmytro Mitin Apr 24 '23 at 02:00
  • Is the updated output in question useful for anything? I tried going in Spark's community but got no help. They don't have any issues tab on their github and I've already tried the methods mentioned on their contact page (the gitter community and email). I guess we will have to leave it here then – Dhruv Apr 24 '23 at 05:01
  • Yeah, Spark bug tracker is at Jira https://issues.apache.org/jira/projects/SPARK/issues – Dmytro Mitin Apr 24 '23 at 23:09

0 Answers0