I have one parquet file in hdfs as initial load of my data. All the next parquets are only these datasets which have changes each day to the initial load (in chronological order). These are my deltas. I want to read all or a few parquet files to have the late data of a specific date. Deltas can contain new records, too.
Example:
Initial Data (Folder: /path/spezific_data/20180101):
ID| Name | Street |
1 | "Tom" |"Street 1"|
2 | "Peter"|"Street 2"|
Delta 1 (Folder: /path/spezific_data/20180102):
ID| Name | Street |
1 | "Tom" |"Street 21"|
Delta 2 (Folder: : /path/spezific_data/20180103):
ID| Name | Street |
2 | "Peter" |"Street 44"|
3 | "Hans" | "Street 12"|
Delta 3 (Folder: : /path/spezific_data/20180105):
ID| Name | Street |
2 | "Hans" |"Street 55"|
It is possible that one day have Deltas but are loaded on day later. (Look at Delta 2 and Delta 3) So the Folder /path/spezific_data/20180104 does note exist and we never want to load this date. Now I want to load different cases.
- Only initial data: That is an easy load of a Directory.
initial = spark.read.parquet("hdfs:/path/spezific_data/20180101/")
- Until a specific date (20180103)
initial_df = spark.read.parquet("hdfs:/path/spezific_data/20180101/") <br>
delta_df = spark.read.parquet("hdfs:/path/spezific_data/20180102/")
Now I have to merge ("Update" I know spark RDDs or dataframes can not do a update) these datasets and load the other one an merge too. Currently I solve this with this line of code (but in an for Loop):
new_df = delta_df.union(initila_df).dropDuplicates("ID") <br>
delta_df = spark.read.parqeut("hdfs:/mypath/20180103/") <br>
new_df = delta_df.union(new_df).dropDuplicates("ID") <br>
But I think that is not a good way to do this.
- Load all data in Folder "/path/spezific_data" I do this like step one with a for loop to the late date
Questions:
Can I do this like this?
Are there better ways?
Can I load this in one df and merge them there?
Currently the load takes very Long (one hour)
Update 1:
I tried to do something like this. If i run this code, it go through all dates until my enddate (I see this on my println(date)). After that, I get an Java.lang.StackOverflowError.
Where is the error?
import org.apache.spark.sql.functions.col
import util.control.Breaks._
var sourcePath = "hdfs:sourceparth/"
var destinationPath = "hdfs:destiantionpath/result"
var initial_date = "20170427"
var start_year = 2017
var end_year = 2019
var end_month = 10
var end_day = 31
var m : String = _
var d : String = _
var date : String = _
var delta_df : org.apache.spark.sql.DataFrame = _
var doubleRows_df : org.apache.spark.sql.DataFrame = _
//final DF, initial load
var final_df = spark.read.parquet(sourcePath + initial_date + "*")
breakable{
for(year <- 2017 to end_year; month <- 1 to 12; day <- 1 to 31){
//Create date String
m = month.toString()
d = day.toString()
if(month < 10)
m = "0" + m
if(day < 10)
d = "0" + d
date = year.toString() + m + d
try{
//one delta
delta_df = spark.read.parquet(sourcePath + date + "*")
//delete double Rows (i want to ignore them
doubleRows_df = delta_df.groupBy("key").count().where("count > 1").select("key")
delta_df = delta_df.join(doubleRows_df, Seq("key"), "leftanti")
//deletes all (old) rows in final_df, that are in delta_df
final_df = final_df.join(delta_df, Seq("key"), "leftanti")
//add all new rows in delta
final_df = final_df.union(delta_df)
println(date)
}catch{
case e:org.apache.spark.sql.AnalysisException=>{}
}
if(day == end_day && month == end_month && year == end_year)
break
}
}
final_df.write.mode("overwrite").parquet(destinationPath)
The full stacktrace:
19/11/26 11:19:04 WARN util.Utils: Suppressing exception in finally: Java heap space
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:223)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:278)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:54)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:43)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:209)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)