0

I have one parquet file in hdfs as initial load of my data. All the next parquets are only these datasets which have changes each day to the initial load (in chronological order). These are my deltas. I want to read all or a few parquet files to have the late data of a specific date. Deltas can contain new records, too.

Example:

Initial Data (Folder: /path/spezific_data/20180101):

ID| Name  | Street    | 
1 | "Tom" |"Street 1"| 
2 | "Peter"|"Street 2"|

Delta 1 (Folder: /path/spezific_data/20180102):

ID| Name  | Street    | 
1 | "Tom" |"Street 21"| 

Delta 2 (Folder: : /path/spezific_data/20180103):

ID| Name  | Street    | 
2 | "Peter" |"Street 44"|
3 | "Hans" | "Street 12"|

Delta 3 (Folder: : /path/spezific_data/20180105):

ID| Name  | Street    | 
2 | "Hans" |"Street 55"|

It is possible that one day have Deltas but are loaded on day later. (Look at Delta 2 and Delta 3) So the Folder /path/spezific_data/20180104 does note exist and we never want to load this date. Now I want to load different cases.

  1. Only initial data: That is an easy load of a Directory.
initial = spark.read.parquet("hdfs:/path/spezific_data/20180101/")
  1. Until a specific date (20180103)
 initial_df = spark.read.parquet("hdfs:/path/spezific_data/20180101/") <br>
 delta_df = spark.read.parquet("hdfs:/path/spezific_data/20180102/")

Now I have to merge ("Update" I know spark RDDs or dataframes can not do a update) these datasets and load the other one an merge too. Currently I solve this with this line of code (but in an for Loop):

 new_df = delta_df.union(initila_df).dropDuplicates("ID") <br>
 delta_df = spark.read.parqeut("hdfs:/mypath/20180103/") <br>
 new_df = delta_df.union(new_df).dropDuplicates("ID") <br>

But I think that is not a good way to do this.

  1. Load all data in Folder "/path/spezific_data" I do this like step one with a for loop to the late date

Questions: Can I do this like this? Are there better ways? Can I load this in one df and merge them there?
Currently the load takes very Long (one hour)

Update 1:
I tried to do something like this. If i run this code, it go through all dates until my enddate (I see this on my println(date)). After that, I get an Java.lang.StackOverflowError. Where is the error?

import org.apache.spark.sql.functions.col
import util.control.Breaks._

var sourcePath = "hdfs:sourceparth/"
var destinationPath = "hdfs:destiantionpath/result"
var initial_date = "20170427"
var start_year = 2017
var end_year = 2019
var end_month = 10
var end_day = 31

var m : String = _
var d : String = _
var date : String = _
var delta_df : org.apache.spark.sql.DataFrame = _
var doubleRows_df : org.apache.spark.sql.DataFrame = _

//final DF, initial load
var final_df = spark.read.parquet(sourcePath + initial_date +  "*")

breakable{
   for(year <- 2017 to end_year; month <- 1 to 12; day <- 1 to 31){
     //Create date String
     m = month.toString()
     d = day.toString()
     if(month < 10)
       m = "0" + m
     if(day < 10)
       d = "0" + d
     date = year.toString() + m + d

     try{
       //one delta
       delta_df = spark.read.parquet(sourcePath + date + "*")

       //delete double Rows (i want to ignore them
       doubleRows_df  = delta_df.groupBy("key").count().where("count > 1").select("key")
       delta_df = delta_df.join(doubleRows_df, Seq("key"), "leftanti")

       //deletes all (old) rows in final_df, that are in delta_df
       final_df = final_df.join(delta_df, Seq("key"), "leftanti")

       //add all new rows in delta
       final_df = final_df.union(delta_df)

       println(date)
     }catch{
       case e:org.apache.spark.sql.AnalysisException=>{}
     }
    if(day == end_day && month == end_month &&  year == end_year)
       break
   }
 }
 final_df.write.mode("overwrite").parquet(destinationPath)

The full stacktrace:

19/11/26 11:19:04 WARN util.Utils: Suppressing exception in finally: Java heap space
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
        at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
        at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:223)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:278)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
        at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
        at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
        at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
        at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
        at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
        at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
        at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
        at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:54)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:43)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
        at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:209)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
        at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
        at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
        at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
        at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Jason Aller
  • 3,541
  • 28
  • 38
  • 38
Rene A
  • 19
  • 4
  • Are you limited with only spark? The use case you described is pretty much for HBase or other database. – Vladislav Varslavans Nov 22 '19 at 09:46
  • Its my task to do this. I was told to do this with spark. Im new in this BigData context. My first step will be to do this with spark. After this i can also do this with other technology – Rene A Nov 22 '19 at 09:53

1 Answers1

0
  1. distinct or dropDuplicates is not an option, since you can't control which values will be taken. It very whell might happen, that new value will not be added, while old value will be preserved.
  2. You need to do join over ID - see types of joins here. The joined rows should then contain either only old, or only new, or both. When only old or only new - you take the one that present, when both - you take only new.

Example from here how to add multiple deltas at once.

Question: What are the best-selling and the second best-selling products in every category?

val dataset = Seq(
  ("Thin",       "cell phone", 6000),
  ("Normal",     "tablet",     1500),
  ("Mini",       "tablet",     5500),
  ("Ultra thin", "cell phone", 5000),
  ("Very thin",  "cell phone", 6000),
  ("Big",        "tablet",     2500),
  ("Bendable",   "cell phone", 3000),
  ("Foldable",   "cell phone", 3000),
  ("Pro",        "tablet",     4500),
  ("Pro2",       "tablet",     6500))
  .toDF("product", "category", "revenue")

val overCategory = Window.partitionBy('category).orderBy('revenue.desc)

val ranked = data.withColumn("rank", dense_rank.over(overCategory))

scala> ranked.show
+----------+----------+-------+----+
|   product|  category|revenue|rank|
+----------+----------+-------+----+
|      Pro2|    tablet|   6500|   1|
|      Mini|    tablet|   5500|   2|
|       Pro|    tablet|   4500|   3|
|       Big|    tablet|   2500|   4|
|    Normal|    tablet|   1500|   5|
|      Thin|cell phone|   6000|   1|
| Very thin|cell phone|   6000|   1|
|Ultra thin|cell phone|   5000|   2|
|  Bendable|cell phone|   3000|   3|
|  Foldable|cell phone|   3000|   3|
+----------+----------+-------+----+

scala> ranked.where('rank <= 2).show
+----------+----------+-------+----+
|   product|  category|revenue|rank|
+----------+----------+-------+----+
|      Pro2|    tablet|   6500|   1|
|      Mini|    tablet|   5500|   2|
|      Thin|cell phone|   6000|   1|
| Very thin|cell phone|   6000|   1|
|Ultra thin|cell phone|   5000|   2|
+----------+----------+-------+----+

UPDATE 1:

First of all consider using date utilities instead of manually iterating over numbers to get date:

Date dt = new Date();
LocalDateTime.from(dt.toInstant()).plusDays(1);

See this for more details.

Second - please post full stacktrace, not just StackOverflowException.

Vladislav Varslavans
  • 2,775
  • 4
  • 18
  • 33
  • OK, but how i can get the dataframe of a spezific date only with joins ? – Rene A Nov 22 '19 at 10:10
  • If initial date is 20180101 and i want to load until 20180201 thats are 31 folders. Do I have to load 31 times an do 31 joins ? – Rene A Nov 22 '19 at 10:13
  • Or can i load initial and one delta. Then drop all keys in initial they are in the Delta an dann Union initial and Delta? – Rene A Nov 22 '19 at 10:18
  • To know that date - you need to have a column that specify date. You can set it after reading data from a name, for example. – Vladislav Varslavans Nov 22 '19 at 10:56
  • For loading multiple deltas i would say you have several approaches. One is to do 31 join on after another. Another approach would do read all deltas, then find out latest values from all the deltas, and then do one join. To find latest values you will need window operations - https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html – Vladislav Varslavans Nov 22 '19 at 11:00
  • I've updated my Question (Update1). I haven't found a solution yet. – Rene A Nov 25 '19 at 16:10
  • I've added the full stacktrace. – Rene A Nov 26 '19 at 10:24
  • The error states that you don't have enough memory. You need either to increase memory or decrease amount of data. – Vladislav Varslavans Nov 26 '19 at 12:31
  • Okay. But ist my skript except for the date okay? – Rene A Nov 26 '19 at 12:42
  • It doesn't look right. What is `key`? Why you delete something from `delta_df`? I suggest you to create small test data to see if your code is correct, not ask somebody if code is correct. Btw, i've posted approach how to handle multiple deltas, why you try inventing your own? – Vladislav Varslavans Nov 26 '19 at 13:07