I have this dataframe called rankedDF:
+----------+-----------+----------+-------------+-----------------------+
|TimePeriod|TPStartDate| TPEndDate|TXN_HEADER_ID| Items|
+----------+-----------+----------+-------------+-----------------------+
| 1| 2017-03-01|2017-05-30| TxnHeader1|Womens Socks, Men...|
| 1| 2017-03-01|2017-05-30| TxnHeader4|Mens Pants, Mens ... |
| 1| 2017-03-01|2017-05-30| TxnHeader7|Womens Socks, Men...|
| 2| 2019-03-01|2017-05-30| TxnHeader1|Calcetas Mujer, Calc...|
| 2| 2019-03-01|2017-05-30| TxnHeader4|Pantalones H, Pan ... |
| 2| 2019-03-01|2017-05-30| TxnHeader7|Calcetas Mujer, Pan...|
So, I need to split this dataframe by each “TimePeriod” as an input for another function, but only with the column Items.
I’ve tried this:
val timePeriods = rankedDF.select(“TimePeriod”).distinct()
so at this point I have:
| Time Period |
| 1 |
| 2 |
According to this “timePeriods” I need to call my function twice:
timePeriods.foreach{
n=> val justItems = rankedDF.filter(col(“TimePeriod”)===n.getAsInt(0)) .select(“Items”)
}
Well... I was waiting for this DataFrame:
|TimePeriod|
|Womens Socks, Men...
|Mens Pants, Mens ...
|Womens Socks, Men...
Instead of that, I’m getting this error:
task 170.0 in stage 40.0 (TID 2223)
java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
at com.brierley.versions.FpGrowth$$anonfun$PfpGrowth$1$$anonfun$apply$3.apply(FpGrowth.scala:720)
at com.brierley.versions.FpGrowth$$anonfun$PfpGrowth$1$$anonfun$apply$3.apply(FpGrowth.scala:718)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/04/24 11:49:32 WARN TaskSetManager: Lost task 170.0 in stage 40.0 (TID 2223, localhost): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
at com.brierley.versions.FpGrowth$$anonfun$PfpGrowth$1$$anonfun$apply$3.apply(FpGrowth.scala:720)
at com.brierley.versions.FpGrowth$$anonfun$PfpGrowth$1$$anonfun$apply$3.apply(FpGrowth.scala:718)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/04/24 11:49:32 ERROR TaskSetManager: Task 170 in stage 40.0 failed 1 times; aborting job
When I run this, actually worked:
val justItems = rankedDF.filter(col(“TimePeriod”)=== 1 ) .select(“Items”)
val justItems = rankedDF.filter(col(“TimePeriod”)=== 2 ) .select(“Items”)
What I'm unable to access dynamically my Data Frame?