1

I'm having a pretty troubling problem with the LAST aggregate in SparkSQL in Spark 2.3.1. It seems to give me around 4 bad results -- that is, values that are not LAST by the specified partitioning and order -- in 500,000 (logical SQL, not Spark) partitions, something like 50MM records. Smaller batches are worse -- the number of errors per batch seems pretty consistent, although I don't think I tried anything smaller than 100,000 logical SQL partitions.

I have roughly 66 FIRST or LAST aggregates, a compound (date, integer) logical sql partition key and a compound (string, string) sort key. I tried converting the four-character numeric values into integers, then I combined them into a single integer. Neither of those moves resolved the problem. Even with a single integer sort key, I was getting a few bad values.

Typically, there are fewer than a hundred records in each partition, and a handful of non-NULL values for any field. It never seems to get the second to last value; it's always at least third to last.

I did try to replace the simple aggregate with a windowed aggregate with ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. The one run I did of that gave me six bad records -- the compound integer key had given me only two, but I didn't do enough runs to really compare the approaches and of course I need zero.

Why do I not seem to be able to rely on LAST()? Here's a test which just illustrates the unwindowed version of the LAST function, although my partitioning and sorting fields are each two fields.

import org.apache.spark.sql.functions.{expr}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}

import collection.JavaConverters._

class LastTest extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {

  implicit val spark: SparkSession = SparkSession.builder().appName("Last Test").master("local[2]").getOrCreate()
  import spark.implicits._

  // TRN_DATE, TRN_NUMBER, TRN_TIMESTAMP, DETAILS, DATE_TIME, QUE_LINE_ID, OPR_INITIALS, ENTRY_TYPE, HIST_NO, SUB_HIST_NO, MSG_INFO
  "LAST" must "work with GROUP BY" in {
    val lastSchema = StructType(Seq(
      StructField("Pfield", IntegerType) // partition field
      , StructField("Ofield", IntegerType) // order field
      , StructField("Vfield", StringType) // value field
    ))
    val last:DataFrame = spark.createDataFrame(List[Row](
      Row(0, 1, "Pencil")
      , Row(5, 1, "Aardvark")
      , Row(10, 1, "Monastery")
      , Row(10, 2, "Remediation")
      , Row(15, 1, "Parcifal")
      , Row(20, 1, "Montenegro")
      , Row(20, 2, "Susquehana")
      , Row(20, 3, "Perfidy")
      , Row(20, 4, "Prosody")
    ).asJava
      , lastSchema
    ).repartition(expr("MOD(Pfield, 4)"))
    last.createOrReplaceTempView("last_group_test")
    // apply the unwindowed last
    val unwindowed:DataFrame = spark.sql("SELECT Pfield, LAST(Vfield) AS Vlast FROM (SELECT * FROM last_group_test ORDER BY Pfield, Ofield) GROUP BY Pfield ORDER BY Pfield")
    unwindowed.show(5)
    // apply a windowed last
    val windowed:DataFrame = spark.sql("SELECT DISTINCT Pfield, LAST(Vfield) OVER (PARTITION BY Pfield ORDER BY Ofield ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS Vlast FROM last_group_test ORDER BY Pfield")
    windowed.show(5)
    // include the partitioning function in the window
    val excessivelyWindowed:DataFrame = spark.sql("SELECT DISTINCT Pfield, LAST(Vfield) OVER (PARTITION BY MOD(Pfield, 4), Pfield ORDER BY Ofield ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS Vlast FROM last_group_test ORDER BY Pfield")
    excessivelyWindowed.show(5)
    assert(unwindowed.collect() === windowed.collect() && windowed.collect() === excessivelyWindowed.collect())
    assert(windowed.count() == 5)
    assert(windowed.filter("Pfield=20").select($"Vlast").collect()(0)(0)==="Prosody")
  }
}

So, all three datasets are the same, which is nice. But, if I apply this logic to my actual needs -- which has sixty-odd columns, almost all of which are LAST values -- I'll get an error, it looks like about 4 times in a batch of 500,000 groups. If I run the dataset 30 times, I'll get 30 different sets of bad records.

Am I doing something wrong, or is this a defect? Is it a known defect? Is it fixed in 2.4? I didn't see if, but "aggregates simply don't work sometimes" can't be something they released with, right?

10465355
  • 4,481
  • 2
  • 20
  • 44
Ion Freeman
  • 512
  • 4
  • 19
  • `first` / `last` are meaningful only when used with window frames having order by clause - in any other contexts results are arbitrary - explanation [here](https://stackoverflow.com/a/33878701/10465355). – 10465355 Nov 18 '19 at 22:00
  • Thanks, @10465355saysReinstateMonica. I didn't really get an explanation there, just a link to https://issues.apache.org/jira/browse/SPARK-16207 In that jira, this comment says "Actions take(), first(), and collect() return results in the order consistent with the DataFrame or group order if any." I can't imagine using FIRST instead of LAST is going to save me, though. Can you help me understand what I missed? – Ion Freeman Nov 18 '19 at 22:21
  • LIke, if I had a data set like this, how would I get the sum and last (by A) non-null values of B and C? ||A||B||C|| |6|3|NULL| |3|4|2| |4|2|1| Do I have to do an windowed last on ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING to get the same value on every row, then do the SUM and the unwindowed last? I guess I could do a windowed SUM at the same time, then just do a DISTINCT, but that kind of argues against GROUP BY altogether. – Ion Freeman Nov 18 '19 at 23:03
  • @10465355saysReinstateMonica I did try window frames. They did not help. – Ion Freeman Nov 22 '19 at 00:29

1 Answers1

0

I was able to resolve the issue by applying with windowed aggregate to a dataset with the same sorting, sorted in a subquery.

SELECT LAST(VAL) FROM (SELEcT * FROM TBL ORDER BY SRT) SRC GROUP BY PRT

was not sufficient, nor was

SELECT LAST(VAL) OVER (PARTITION BY PRT ORDER BY SRT ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM TBL

I had to do both

SELECT DISTINCT LAST(VAL) OVER (PARTITION BY PRT ORDER BY SRT ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM (SELEcT * FROM TBL ORDER BY SRT) SRC

These datasets had been extracted from an Oracle 12.2 instance over JDBC. I also added SRT to the order by clause there, which had just had ORDER BY PRT.

Further -- and I think this may have been most significant -- I used the cacheTable API on the spark catalog object after extracting the data. I had been doing

.repartition
.cache
.count

in order to load all the records with a relatively small number of data connections, but I suspect it was not enough to get all the data sparkside before the aggregations took place.

Ion Freeman
  • 512
  • 4
  • 19
  • I'll leave the bounty up in case a canonical answer arrives; this currently looks like a functional answer. – Ion Freeman Nov 22 '19 at 01:48
  • I'd say that if you see non-determinisim (with exception to ties) for the second option, using current support version, you should report a bug. Additional sort in subquery should be irrelevant in the execution plan (there is another shuffle after it) and its observed impact is probably accidental. – 10465355 Nov 23 '19 at 14:35
  • Thanks, @10465355saysReinstateMonica. It's a very defecty defect. Last() should work all the time, right? What's happening to me? What I want is for someone to tell me it's a known bug which is fixed in 2.4. – Ion Freeman Nov 26 '19 at 21:34
  • As far as I can tell - the first shouldn't work, the second one should and should give deterministic results as long as `SRT` is unique, the third one should make no (logical, there is still additional shuffle) difference compared to the second one. – 10465355 Nov 27 '19 at 10:54
  • Why shouldn't the first one work? Last had an unwindowed version. I sorted it when I saved the table, so I shouldn't have to sort it again -- which I only do because I was having this problem -- but it doesn't hurt. – Ion Freeman Nov 28 '19 at 11:36
  • You seem to confuse many things here - order of aggregations (which is not guaranteed in both SQL standard), analytical functions and aggregations (they have completely different behavior and interaction with surrounding scope), order in source (for starters order in which you save data in Oracle is irrelevant even for Oracle itself. Also impact of order in source on Spark results is just incidental / detail of implementation). In general fgor any practical usage `LAST` and `FIRST` in non-analytical context should be considered non-deterministic. – 10465355 Nov 28 '19 at 13:23
  • "LAST and FIRST in non-analytical context should be considered non-deterministic." Well, it does work in more than 99.98% of the cases. And it works as well as LAST as a windowed function. I would suggest you reconsider. – Ion Freeman Nov 30 '19 at 21:05
  • What's there to reconsider? Language standard is pretty clear about such things, if one bothers to read it. And " it works" _is_ a detail of implementation (specifically scheduling strategy). – 10465355 Dec 04 '19 at 20:42