0

I have a table in remote SQL database.

CUSTOMERID  ACCOUNTNO   VEHICLENUMBER   TAGSTARTEFFDATE
20000000    10003014    MH43AJ411       2013-06-07 13:07:13.210 
20000001    10003014    MH43AJ411       2014-08-08.19:10:11.519
20029961    10003019    GJ15CD7387      2016-07-28 19:21:54.173
20009020    10003019    GJ15CF7747      2016-05-25 18:46:55.947
20001866    10003019    GJ15CD7657      2015-07-11 15:17:14.503
20001557    10003019    GJ15CB9601      2016-05-05 16:45:58.247
20001223    10003019    GJ15CA7837      2014-06-06 14:57:42.583
20000933    10003019    MH02DG7774      2014-02-12 13:49:31.427
20001690    10003019    GJ15CD7477      2015-01-03 16:12:59.000
20000008    10003019    GJ15CB727       2013-06-17 12:36:01.190
20001865    10003019    GJ15CA7387      2015-06-24 15:01:14.000
20000005    10003019    MH02BY7774      2013-06-15 12:29:10.000

I want to export as JSON and this is the code snippet.

  val jdbcSqlConnStr = s"jdbc:sqlserver://192.168.70.15;databaseName=$db;user=bhaskar;password=welcome123;"      
  val jdbcDbTable = table1
  val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> jdbcSqlConnStr,"dbtable" -> jdbcDbTable)).load()
//jdbcDF.show(10)
//jdbcDF.printSchema

val query = "SELECT ACCOUNTNO, collect_set(struct(`VEHICLENUMBER`, `CUSTOMERID`, `TAGSTARTEFFDATE`)) as VEHICLE FROM tp_customer_account GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO"



 
 jdbcDF.registerTempTable("tp_customer_account")
 val res00 = sqlContext.sql(query.toString) 
// res00.show(10)
 res00.coalesce(1).write.json("D:/res15")  

Issue: But here the problem is that I am getting multiple VEHICLENUMBER because more than one TAGSTARTEFFDATE along with the same VEHICLENUMBER is present in the table.

Want to achieve: So I want to retrieve the TAGSTARTEFFDATE which is maximum date for the same VEHICLENUMBER. I want to use SparkSQL query using SQLContext as I have given in the code snippet.

Wenfang Du
  • 8,804
  • 9
  • 59
  • 90
Bhaskar Das
  • 652
  • 1
  • 9
  • 28

1 Answers1

1

You can use Window functions with dense_rank() that goes something like this

val windowSpec = Window.partitionBy(col("VEHICLENUMBER")).orderBy(col("TAGSTARTEFFDATE").desc)
jdbcDF.withColumn("rank", dense_rank().over(windowSpec)).filter(col("rank") === 1).drop(col("rank"))

At the moment I'm not really sure how to express this logic with pure SQL syntax but if you are not restricted to using just SQL you can utilize this snippet.

Edit

Took help of a friend to get a SQL equivalent of above. Try if it works.

SELECT * FROM tp_customer_account WHERE dense_rank() over(partition by VEHICLENUMBER order by TAGSTARTEFFDATE) = 1
kaysush
  • 4,797
  • 3
  • 27
  • 47