0

I have this dataframe called rankedDF:

+----------+-----------+----------+-------------+-----------------------+
|TimePeriod|TPStartDate| TPEndDate|TXN_HEADER_ID|               Items|
+----------+-----------+----------+-------------+-----------------------+

|         1| 2017-03-01|2017-05-30|   TxnHeader1|Womens Socks, Men...|

|         1| 2017-03-01|2017-05-30|   TxnHeader4|Mens Pants, Mens ...   |

|         1| 2017-03-01|2017-05-30|   TxnHeader7|Womens Socks, Men...|

|         2| 2019-03-01|2017-05-30|   TxnHeader1|Calcetas Mujer, Calc...|

|         2| 2019-03-01|2017-05-30|   TxnHeader4|Pantalones H, Pan ...  |

|         2| 2019-03-01|2017-05-30|   TxnHeader7|Calcetas Mujer, Pan...|

So, I need to split this dataframe by each “TimePeriod” as an input for another function, but only with the column Items.

I’ve tried this:

val timePeriods = rankedDF.select(“TimePeriod”).distinct()

so at this point I have:

| Time Period           |
|                     1 |
|                     2 |

According to this “timePeriods” I need to call my function twice:

timePeriods.foreach{

 n=> val justItems = rankedDF.filter(col(“TimePeriod”)===n.getAsInt(0))                                         .select(“Items”)

}

Well... I was waiting for this DataFrame:

|TimePeriod|
|Womens Socks, Men...
|Mens Pants, Mens ...   
|Womens Socks, Men...

Instead of that, I’m getting this error:

task 170.0 in stage 40.0 (TID 2223)
java.lang.NullPointerException
                at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
                at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
                at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
                at com.brierley.versions.FpGrowth$$anonfun$PfpGrowth$1$$anonfun$apply$3.apply(FpGrowth.scala:720)
                at com.brierley.versions.FpGrowth$$anonfun$PfpGrowth$1$$anonfun$apply$3.apply(FpGrowth.scala:718)
                at scala.collection.Iterator$class.foreach(Iterator.scala:727)
                at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
                at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
                at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
                at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
                at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:89)
                at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
18/04/24 11:49:32 WARN TaskSetManager: Lost task 170.0 in stage 40.0 (TID 2223, localhost): java.lang.NullPointerException
                at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
                at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
                at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
                at com.brierley.versions.FpGrowth$$anonfun$PfpGrowth$1$$anonfun$apply$3.apply(FpGrowth.scala:720)
                at com.brierley.versions.FpGrowth$$anonfun$PfpGrowth$1$$anonfun$apply$3.apply(FpGrowth.scala:718)
                at scala.collection.Iterator$class.foreach(Iterator.scala:727)
                at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
                at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
                at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
                at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
                at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:89)
                at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)

18/04/24 11:49:32 ERROR TaskSetManager: Task 170 in stage 40.0 failed 1 times; aborting job

When I run this, actually worked:

val justItems = rankedDF.filter(col(“TimePeriod”)=== 1 )                                                                                                         .select(“Items”)
val justItems = rankedDF.filter(col(“TimePeriod”)=== 2 )                                                                                                               .select(“Items”)

What I'm unable to access dynamically my Data Frame?

zero323
  • 322,348
  • 103
  • 959
  • 935
LeeFernan
  • 5
  • 4

1 Answers1

0

you need to collect the distinct values first, then you can use map:

    val rankedDF : DataFrame = ???
    val timePeriods = rankedDF.select("TimePeriod").distinct().as[Int].collect()

    val dataFrames: Array[DataFrame] = timePeriods.map(tp => rankedDF.where(col("TimePeriod")===tp))
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • Thank you, your answer works very well for the first part of my develop, but im stuck in somehting else XD – LeeFernan Apr 24 '18 at 23:46