0

I load some data, perform some operations, then I want to compute the average weight per contactId

    package main.scala


    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType};

    import com.databricks.spark.corenlp.functions._


    object SQLContextSingleton {

      @transient  private var instance: SQLContext = _

      def getInstance(sparkContext: SparkContext): SQLContext = {
        if (instance == null) {
          instance = new SQLContext(sparkContext)
        }
        instance
      }
    }


    object Sentiment {
      def main(args: Array[String]) {

        // Spark and SQL Context (gives access to Spark and Spark SQL libraries)
        val conf = new SparkConf().setAppName("Sentiment")
        val sc = new SparkContext(conf)
        val sqlContext = SQLContextSingleton.getInstance(sc)
        import sqlContext.implicits._ 


        // Schema of the table. Fields' name and type have to match the headers
        val customSchema = StructType(Array(
          StructField("contactId", StringType, true),
          StructField("weight", DoubleType, true),
          StructField("email", StringType, true)))

        // Load dataframe
        val df = sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", "true")          // File has headers
          .option("delimiter","\t")          // Delimiter is tab
          //.option("inferSchema","true")    // Automatically infer data type
          .option("parserLib", "UNIVOCITY")  // Parser, which deals better with the email formatting
          .schema(customSchema)              // Schema of the table
          .load("outputs/cleanedEmails.txt") // Input file
          // SAVE DF

        // Print out the first 20 rows of the dataframe
        //df.show()

        // Sentiment Analysis
        val sent = df
          .select('contactId, 'weight, sentiment('email).as('sentiment)) // Add sentiment analysis output to dataframe
          .select('contactId, 'weight*'sentiment)                        // Multiply sentiment score by weight
          .withColumnRenamed("(weight * sentiment)", "weight")                                    // Rename column
          .filter('weight >= 0.0005)
          //.groupBy("contactId").mean("weight")                                                    // Calculate average weight per unique contactId

         sent.show()  sent.write.format("parquet").mode("overwrite").save("outputs/TempDF.parquet")              // Save DF as parquet - Will overwrite previously saved Parquet.

}

At this point, every works find. However, if I do sent.show(), spark-submit will throw the following error:

        ERROR Executor: Managed memory leak detected; size = 16777216 bytes, TID = 1
    ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.util.NoSuchElementException
        at java.util.ArrayList$Itr.next(ArrayList.java:854)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
        at scala.collection.IterableLike$class.head(IterableLike.scala:91)
        at scala.collection.AbstractIterable.head(Iterable.scala:54)
        at main.scala.com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:163)
        at main.scala.com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:158)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:75)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:74)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:964)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
        at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
        at org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:117)
        at org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:115)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:371)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
        at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
        at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
        at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
        at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:401)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:362)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:370)
        at main.scala.Sentiment$.main(Sentiment.scala:68)
        at main.scala.Sentiment.main(Sentiment.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.util.NoSuchElementException
        at java.util.ArrayList$Itr.next(ArrayList.java:854)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
        at scala.collection.IterableLike$class.head(IterableLike.scala:91)
        at scala.collection.AbstractIterable.head(Iterable.scala:54)
        at main.scala.com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:163)
        at main.scala.com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:158)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:75)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:74)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:964)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
        at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
        at org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:117)
        at org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:115)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:371)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
        at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

This might means that there are null values in my data frame, but I do not know how to find them. Thoughts?

Béatrice Moissinac
  • 934
  • 2
  • 16
  • 41
  • 1
    first you are sure that this code compiles ? second you can't all another dataframe column inside of a select ! – eliasah Jun 24 '16 at 06:39
  • Yes, the code compiles - I put the groupBy().mean in another dataframe (see edits) but showing that dataframe makes the code crash. – Béatrice Moissinac Jun 24 '16 at 15:52
  • Like I said you can't call a column from another DataFrame in the select ! You can't use sentiment(...) inside the select of df – eliasah Jun 24 '16 at 15:54
  • Yes, sentiment() can be used inside select(), as shown on the example of sentiment() here: https://github.com/databricks/spark-corenlp/ – Béatrice Moissinac Jun 24 '16 at 16:13
  • Here sentiment is an implicit method and not a DataFrame. I'm not sure what you are doing anymore, but it seems like your question is missing some important informations. – eliasah Jun 24 '16 at 16:17
  • I wouldn't worry too much about what sentiment() does, it returns a score for each row, which I then multiply with the weight, rename the column, and filter for weights that are too small. Up until there, it works great, I can show() the dataframe. Some contactId repeats, so I want the average weight for each contactId, thus .groupBy("contactId").mean("weight") - The API says that mean() returns the data frame. I'm confused as to what causes the java.util.NoSuchElementException, which might mean there are null elements in my DF but I don't understand why groupBy() causes it. – Béatrice Moissinac Jun 24 '16 at 16:26
  • Please, let me know what additional information I can provide. I started coding with Spark and Scala this week, thus my fundamentals are still shallow. – Béatrice Moissinac Jun 24 '16 at 16:26
  • I also just realized that I cannot save df sent, I get the same error " java.util.NoSuchElementException" - how can I check for null values in my df? – Béatrice Moissinac Jun 24 '16 at 16:28
  • Can you provide a Minimum complete verifiable example ? http://stackoverflow.com/help/mcve – eliasah Jun 24 '16 at 16:32
  • Yes. See edited code. But it won't run without the spark-corenlp package. – Béatrice Moissinac Jun 24 '16 at 16:36
  • In the example df is correctly formatted - I'm able to groupBy() show() save() etc. Thus, I believe indeed the problem comes from sentiment(), but the API shows the usage of sentiment() inside a select(). I don;t know how else to use it, at this point in my limited knowledge. – Béatrice Moissinac Jun 24 '16 at 16:42
  • I figured that sentiment() was returning null values because some of the emails where empty strings. I updated my parser and the problem is gone. – Béatrice Moissinac Jun 24 '16 at 18:04
  • have you be able to fix the error ? I have the same issue. – elcomendante Oct 12 '16 at 13:26

0 Answers0