Not possible this way. Need to use tables and then save to a file as final part of process.
Suggest you align your approach as follows - which allows parallel loading but really I suspect not really of benefit.
Load all files in order of delivery with each record loaded being tagged with a timestamp or some ordering sequence from your sequence number of files along with type of record. E.g. File X with, say, position 2 in sequence gets records loaded with seqnum = 2. You can use the DF approach on the file being processed and appending to a Impala / Hive KUDU table if performing all within SPARK domain.
For records in the same file apply monotonically_increasing_id() to get ordering within the file if same key can exist in same file. See DataFrame-ified zipWithIndex. Or zipWithIndex via RDD via conversion and back to DF.
Then issue a select statement to take the key values with maximum value timestamp, seq_num per key. E.g. if in current run 3 recs, say, for key=1, only one needs to be processed - presumably the one with highest value.
Save as a new file.
Process this new file accordingly.
OR:
Bypass step 3 and read in asc order and process data accordingly.
Comment to make:
Typically I load such data with LOAD to HIVE / IMPALA with partitioning key being set via extracting timestamp from the file name. Requires some LINUX scripting / processing. That's a question of style and should not be a real Big Data bottleneck.
Here is a snippet with simulated input of how some aspects can be done to allow a MAX select against a key for UPSerts. The Operation, DEL,ALT whatever you need to add. Although I think you can do this yourself actually from what I have seen:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
def dfSchema(columnNames: List[String]): StructType =
StructType(
Seq(
StructField(name = "key", dataType = StringType, nullable = false),
StructField(name = "file", dataType = StringType, nullable = false),
StructField(name = "ts", dataType = StringType, nullable = false),
StructField(name = "val", dataType = StringType, nullable = false),
StructField(name = "seq_val", dataType = LongType, nullable = false)
)
)
val newSchema = dfSchema(List("key", "file", "ts", "val", "seq_val"))
val df1 = Seq(
("A","F1", "ts1","1"),
("B","F1", "ts1","10"),
("A","F1", "ts2","2"),
("C","F2", "ts3","8"),
("A","F2", "ts3","3"),
("A","F0", "ts0","0")
).toDF("key", "file", "ts","val")
val rddWithId = df1.sort($"key", $"ts".asc).rdd.zipWithIndex
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
dfZippedWithId.show
returns:
+---+----+---+---+-------+
|key|file| ts|val|seq_val|
+---+----+---+---+-------+
| A| F0|ts0| 0| 0|
| A| F1|ts1| 1| 1|
| A| F1|ts2| 2| 2|
| A| F2|ts3| 3| 3|
| B| F1|ts1| 10| 4|
| C| F2|ts3| 8| 5|
+---+----+---+---+-------+
ready for subsequent processing.