0

s_n181n is a dataframe and here I go through the 3rd and 5th column of the dataframe row wise

and

where the column nd is <=1.0 it breaks the code

ts(timestamp) | nd (nearest distance)

is the output columns, shown above

But what i need is the timestamp of last row value i.e 1529157727000 

I want to break the loop showing last value of the loop here. How to store that last row's timestamp value in a variable , so that outside this loop I can use it .

RAGHHURAAMM
  • 1,099
  • 7
  • 15
stackoverflow
  • 59
  • 1
  • 1
  • 11
  • From the given example, it looks like the dataframe is sorted and you are trying to get the max value of the `ts` column. Is that correct? – Jegan Oct 12 '18 at 14:44
  • You can order by desc and select the time stamp example, based on df.first, right? OR refer here: https://stackoverflow.com/questions/39544796/pyspark-spark-how-to-select-last-row-and-also-how-to-access-pyspark-dataframe-b – pvy4917 Oct 12 '18 at 15:25
  • @jegan yes i want to get the max value of ts for that particular data when condition satisfied. if i use max(ts) then it will go and get the next max of whole dataset(ts).which i donot for this break condition only i need the max timestamp where it breaks the loop – stackoverflow Oct 15 '18 at 04:41

1 Answers1

1

Here's my understanding of your requirement based on your question description and comment:

Loop through the collect-ed RDD row-wise, and whenever nd in the current row is less than or equal to the ndLimit, extract ts from the previous row and reset ndLimit to value of nd from that same row.

If that's correct, I would suggest using foldLeft to assemble the list of timestamps, as shown below:

import org.apache.spark.sql.Row

val s_n181n = Seq(
  (1, "a1", 101L, "b1", 1.0),  // nd 1.0 is the initial limit
  (2, "a2", 102L, "b2", 1.6),
  (3, "a3", 103L, "b3", 1.2),
  (4, "a4", 104L, "b4", 0.8),  // 0.8 <= 1.0, hence ts 103 is saved and nd 1.2 is the new limit
  (5, "a5", 105L, "b5", 1.5),
  (6, "a6", 106L, "b6", 1.3),
  (7, "a7", 107L, "b7", 1.1),  // 1.1 <= 1.2, hence ts 106 is saved and nd 1.3 is the new limit
  (8, "a8", 108L, "b8", 1.2)   // 1.2 <= 1.3, hence ts 107 is saved and nd 1.1 is the new limit
).toDF("c1", "c2", "ts", "c4", "nd")

val s_rows = s_n181n.rdd.collect

val s_list = s_rows.map(r => (r.getAs[Long](2), r.getAs[Double](4))).toList
// List[(Long, Double)] = List(
//   (101,1.0), (102,1.6), (103,1.2), (104,0.8), (105,1.5), (106,1.3), (107,1.1), (108,1.2)
// )

val ndLimit = s_list.head._2  // 1.0

s_list.tail.foldLeft( (s_list.head._1, s_list.head._2, ndLimit, List.empty[Long]) ){
  (acc, x) =>
    if (x._2 <= acc._3)
      (x._1, x._2, acc._2, acc._1 :: acc._4)
    else
      (x._1, x._2, acc._3, acc._4)
}._4.reverse
// res1: List[Long] = List(103, 106, 107)

Note that a tuple of ( previous ts, previous nd, current ndLimit, list of timestamps ) is used as the accumulator to carry over items from the previous row for the necessary comparison logic in the current row.

Leo C
  • 22,006
  • 3
  • 26
  • 39
  • this is not the end of dataset , its just data which satisfied the condition ,and after that data is still there. but i need the last value only. and want to pick the last ts value here. and for another loop i want to start the loop based on that value – stackoverflow Oct 15 '18 at 04:15
  • @stackoverflow, please see revised answer. – Leo C Oct 15 '18 at 15:54
  • thanx mate but this solved case. here is what exactly i m struck https://stackoverflow.com/questions/52758059/looping-on-spark-sql-scala-dataframe?noredirect=1#comment92439314_52758059 – stackoverflow Oct 16 '18 at 04:33