10

Find previous month sale of each city from Spark Data frame

|City|     Month   |Sale|
+----+----------- +----- +
|  c1|    JAN-2017|  49 |
|  c1|    FEB-2017|  46 |
|  c1|    MAR-2017|  83 |
|  c2|    JAN-2017|  59 |
|  c2|    MAY-2017|  60 |
|  c2|    JUN-2017|  49 |
|  c2|    JUL-2017|  73 |
+----+-----+----+-------

Required solution is

|City|     Month  |Sale   |previous_sale|
+----+-----+-------+-------------+--------
|  c1|    JAN-2017|  49|           NULL  |
|  c1|    FEB-2017|  46|           49    |
|  c1|    MAR-2017|  83|           46    |
|  c2|    JAN-2017|  59|           NULL  |
|  c2|    MAY-2017|  60|           59    |
|  c2|    JUN-2017|  49|           60    |
|  c2|    JUL-2017|  73|           49    |
+----+-----+----+-------------+-----------

Please help me

blackbishop
  • 30,945
  • 11
  • 55
  • 76
prakash
  • 419
  • 2
  • 4
  • 13

2 Answers2

16

You can use lag function to get the previous value

If you want to sort by month you need to convert to proper date. For "JAN-2017" to "01-01-2017" something like this.

import spark.implicits._
val df = spark.sparkContext.parallelize(Seq(
  ("c1", "JAN-2017", 49),
("c1", "FEB-2017", 46),
("c1", "MAR-2017", 83),
("c2", "JAN-2017", 59),
("c2", "MAY-2017", 60),
("c2", "JUN-2017", 49),
("c2", "JUL-2017", 73)
)).toDF("city", "month", "sales")

val window = Window.partitionBy("city").orderBy("month")

df.withColumn("previous_sale", lag($"sales", 1, null).over(window)).show

Output:

+----+--------+-----+----+
|city|   month|sales| previous_sale|
+----+--------+-----+----+
|  c1|FEB-2017|   46|null|
|  c1|JAN-2017|   49|  46|
|  c1|MAR-2017|   83|  49|
|  c2|JAN-2017|   59|null|
|  c2|JUL-2017|   73|  59|
|  c2|JUN-2017|   49|  73|
|  c2|MAY-2017|   60|  49|
+----+--------+-----+----+

You can use this UDF to create a default date like 01/month/year which will be used so sort with date even if it has different year

val fullDate = udf((value :String )=>
{
  val months = List("JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC")
  val splited = value.split("-")
  new Date(splited(1).toInt, months.indexOf(splited(0)) + 1, 1)
})

df.withColumn("month", fullDate($"month")).show()

Hope this hepls!

koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Hi Shankar, this working fine, but i need code without using Window. – prakash Jul 10 '17 at 12:58
  • i hope that you will help me. – prakash Jul 10 '17 at 12:58
  • I think this is the best possible way, it may not be as easy to get without using Window. – koiralo Jul 10 '17 at 13:02
  • You may be able to do with the groupBy, but it adds more complex – koiralo Jul 10 '17 at 13:12
  • Hi Shankar, thanks alot it working,but my Project manager not accepting this code ,he wants solution by self join of Dataframe,please give me some hint. – prakash Jul 11 '17 at 06:45
  • 1
    I think this is the best option we got, I don't know why your PM is not accepting, I can give you the logic for joining, First create a new column with a increasing number for each city c1 c2 and then and join with itself with with increasing id -1 – koiralo Jul 11 '17 at 09:04
  • Hi Shankar, i am new in spark scala,may you provide me some link so that from where i can do more practice – prakash Jul 12 '17 at 04:57
  • hi Shankar, i got solution, but for city2 there is no data available for the month of April, in this case previous month sale of city2 in month May month should be 0 ,please guide me – prakash Jul 13 '17 at 07:47
  • ok good to hear, that you have done, now what you need to do is, you need to insert new months with sales as 0 if they are missing, this solves your problem. – koiralo Jul 13 '17 at 07:56
  • Hi Shankar, but how we will determine that which month data(row) not present, i want a generalisation ,may you give some hint. – prakash Jul 13 '17 at 08:41
  • 1
    find the max date and min date of a city and calculate all the months between them – koiralo Jul 13 '17 at 08:42
  • hi Shankar, please see my solution – prakash Jul 19 '17 at 12:05
  • Hi Shankar,please see another problem in URL stackoverflow.com/questions/45324375/… ,please give me some idea to solve this,it is very need full for me – prakash Jul 27 '17 at 05:00
  • Thanks allot! sir, brilliant solution given by you – prakash Jul 27 '17 at 11:18
  • Hi Shankar, please guide me for a new problem given in URL https://stackoverflow.com/questions/45553300/compare-two-spark-dataframe – prakash Aug 08 '17 at 06:21
0
package com.incedo.pharma
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.SparkSession
//import org.apache.spark.sql.expressions.Window
import java.sql.Date

class SecondTask {
  def previousMonthSale(){
    val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
    import ss.sqlContext.implicits._
    val df = sqlContext.read.format("com.databricks.spark.csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .option("delimiter", "|")
             .load("taskTwoData.csv")
    val df1 = df.withColumn("Timestamp", unix_timestamp(df("Date"), "MM/dd/yyyy").cast("timestamp"))
    val df2 = df1.withColumn("Month",month(df1("Timestamp")))
    val df3 = df2.groupBy("City", "Month").agg(sum(col("Sale")).alias("Current_Month_Total_Sale")).orderBy("City","Month")
    val df4 = df3.withColumn("pre_month",df3("Month")-1)
    val df5 = df4.alias("a").join(df3.alias("b"),$"a.pre_month" === $"b.Month" && $"a.City" === $"b.City","left_outer")
              .select($"a.City",$"a.Month",$"a.Current_Month_Total_Sale",($"b.Current_Month_Total_Sale")
              .alias("Previous_Month_Total_Sale")).na.fill(0,Seq("Previous_Month_Total_Sale"))

    val df6 = df5.withColumn("Percent_Sale_Change",round(((df5("Current_Month_Total_Sale") - df5("Previous_Month_Total_Sale"))/df5("Current_Month_Total_Sale"))*100,2))
    val df7 = df6.groupBy("City").max("Current_Month_Total_Sale").alias("Max_Sale").orderBy("City")
    //df7.show()
    val df8 = df6.join(df7, Seq("City"))
    val df9 = df8.withColumn("Percent_Sale_By_Max_Sale", round(df8("Current_Month_Total_Sale")/df8("max(Current_Month_Total_Sale)"),2))
             .drop("max(Current_Month_Total_Sale)")
    df9.toDF().show()
  }
}

object taskTwo {
  def main(arr: Array[String]) {
    new SecondTask().previousMonthSale()
  }`enter code here`
}
prakash
  • 419
  • 2
  • 4
  • 13