0

I get multiple incoming files and i have to compare each incoming file with the source file then merge and replace the old rows with the new rows and append the extra rows if any present in the source file. Afterwords I have to use the updated sourcefile and compare with another incoming file, update it and so the process goes on.

I have so far created the dataframe for each file and compared and merged using join. i want to save all the updates done in the source file and use the updated source file again to compare and update incomming files.

          val merge = df.union(dfSource.join(df, Seq( "EmployeeID" ),joinType= "left_anti").orderBy("EmployeeID") )

          merge.write.mode ("append").format("text").insertInto("dfSource")
              merge.show()

I tried this way but it dosent update my dfSource dataframe. could somebody help please.

Thanks

Imi007
  • 13
  • 5

1 Answers1

0

Not possible this way. Need to use tables and then save to a file as final part of process.

Suggest you align your approach as follows - which allows parallel loading but really I suspect not really of benefit.

  1. Load all files in order of delivery with each record loaded being tagged with a timestamp or some ordering sequence from your sequence number of files along with type of record. E.g. File X with, say, position 2 in sequence gets records loaded with seqnum = 2. You can use the DF approach on the file being processed and appending to a Impala / Hive KUDU table if performing all within SPARK domain.

  2. For records in the same file apply monotonically_increasing_id() to get ordering within the file if same key can exist in same file. See DataFrame-ified zipWithIndex. Or zipWithIndex via RDD via conversion and back to DF.

  3. Then issue a select statement to take the key values with maximum value timestamp, seq_num per key. E.g. if in current run 3 recs, say, for key=1, only one needs to be processed - presumably the one with highest value.

  4. Save as a new file.

  5. Process this new file accordingly.

  6. OR:

    Bypass step 3 and read in asc order and process data accordingly.

Comment to make: Typically I load such data with LOAD to HIVE / IMPALA with partitioning key being set via extracting timestamp from the file name. Requires some LINUX scripting / processing. That's a question of style and should not be a real Big Data bottleneck.

Here is a snippet with simulated input of how some aspects can be done to allow a MAX select against a key for UPSerts. The Operation, DEL,ALT whatever you need to add. Although I think you can do this yourself actually from what I have seen:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._ 

def dfSchema(columnNames: List[String]): StructType =
  StructType(
    Seq(
      StructField(name = "key", dataType = StringType, nullable = false),
      StructField(name = "file", dataType = StringType, nullable = false),
      StructField(name = "ts", dataType = StringType, nullable = false),
      StructField(name = "val", dataType = StringType, nullable = false),
      StructField(name = "seq_val", dataType = LongType, nullable = false)      
    )
  )

val newSchema = dfSchema(List("key", "file", "ts", "val", "seq_val"))

val df1 = Seq(
   ("A","F1", "ts1","1"),
   ("B","F1", "ts1","10"),
   ("A","F1", "ts2","2"),
   ("C","F2", "ts3","8"),
   ("A","F2", "ts3","3"),
   ("A","F0", "ts0","0")  
 ).toDF("key", "file", "ts","val")

val rddWithId = df1.sort($"key", $"ts".asc).rdd.zipWithIndex
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

dfZippedWithId.show

returns:

+---+----+---+---+-------+
|key|file| ts|val|seq_val|
+---+----+---+---+-------+
|  A|  F0|ts0|  0|      0|
|  A|  F1|ts1|  1|      1|
|  A|  F1|ts2|  2|      2|
|  A|  F2|ts3|  3|      3|
|  B|  F1|ts1| 10|      4|
|  C|  F2|ts3|  8|      5|
+---+----+---+---+-------+

ready for subsequent processing.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83