0

I have a table with columns datetime and quantity that are being used to track time spent to finish a workflow.

datetime quantity
2023-01-01 14:05 5
2023-01-01 14:12 12
2023-01-01 14:13 13
2023-01-01 14:20 17

I want to duplicate the datetime rows when the lead quantity is not a consecutive value so that each distinct quantity number has a datetime value assigned to it. Goal table below:

datetime quantity
2023-01-01 14:05 1
2023-01-01 14:05 2
2023-01-01 14:05 3
2023-01-01 14:05 4
2023-01-01 14:05 5
2023-01-01 14:12 6
2023-01-01 14:12 7
2023-01-01 14:12 8
2023-01-01 14:12 9
2023-01-01 14:12 10
2023-01-01 14:12 11
2023-01-01 14:12 12
2023-01-01 14:13 13
2023-01-01 14:20 14
2023-01-01 14:20 15
2023-01-01 14:20 16
2023-01-01 14:20 17

I haven't been able to find a way to duplicate datetime values for growing quantity figures.

So far I'm stuck and any basic attempts of filling in the values have been unsuccessful.

Abecee
  • 2,365
  • 2
  • 12
  • 20

2 Answers2

1

(ANSI SQL in general and) T-SQL in particular does provide for

CREATE TABLE Table1
    (date_time datetime, quantity int)
;
    
INSERT INTO Table1
    (date_time, quantity)
VALUES
    ('2023-01-01 14:05:00', 5),
    ('2023-01-01 14:12:00', 12),
    ('2023-01-01 14:13:00', 13),
    ('2023-01-01 14:20:00', 17)
;

;with
List0(quantity) as (
  select
    1
  union all
  select
    quantity + 1
  from List0
  where quantity < 10000
),
List1a as (
  select top 1000000
    t1.date_time,
    l0.quantity
  from List0 l0
  left join Table1 t1
    on t1.quantity = l0.quantity
  order by l0.quantity
),
List1 as (
  select top 1000000
    (select date_time
     from Table1
     where quantity = (select min(quantity)
                       from Table1
                       where quantity >= l0.quantity)
    ) as date_time,
    l0.quantity
  from List0 l0
  left join Table1 t1
    on t1.quantity = l0.quantity
  order by l0.quantity
)
select
--  * from List0
  * from List1a
--  * from List1 where quantity <= (select max(quantity) from Table1)
OPTION (MAXRECURSION 10000)
;

This comes with some flexibility and some limitations.

Not sure, it works this way in Apache Spark - and for your actual needs.

However, instead of the recursive CTE (List0) you could either use a system table with a sufficient number of consecutive IDs or generate a number table.

See it in action: SQL Fiddle.

How does it work?

NB: For illustration purposes, List1a has been added.

  • First, some kind of number table is needed to get the full sequence of required values. This is here the purpose of List0. (Technically, there is no need to name its column after the one in the actual data. And it has far more rows than required for the sample data - just to stress its generic nature.)
  • List1a demonstrates how the LEFT JOIN fills the gaps in the original data. But all rows with no data for date_time come back with just null.
  • The sub-SELECT in the SELECT of List1 pulls the 'next available' date_time from Table1 in order to fill the gaps.
  • top 1000000 is needed for SQL Server to 'allow' the order by inside a CTE. (Routinely, I pick the million - as it usually exceeds by far the number of expected rows...)
  • OPTION (MAXRECURSION 10000) moves the recursion limit for List0 up from the standard value 100.

Please comment, if and as this requires adjustment / further detail.


Edit: @thebluephantom kindly pointed out, Spark not to feature recursive queries.

are just two discussions for T-SQL / SQL for obtaining sequences.

SparkSQL on pyspark: how to generate time series? is focussing on Spark.

My objective was to give a working example, how the issue at hand can be resolved... Unfortunately, I can't address the Spark specifics.

Abecee
  • 2,365
  • 2
  • 12
  • 20
0

This is a Spark / Scala example. My own data, you need to tailor to yours.

This question shows some issues with Spark when executed at scale. The scale is not stated in the question.

Databricks stated that RANGE optimization could help. https://docs.databricks.com/optimizations/range-join.html I was not convinced.

Coding for small scale.

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

// 1. Gen data in quick, but not best manner. You can tailor otherwise. Renamed your column latterly.
val df = Seq( ( "A",  5), ( "B", 12), ( "C", 13 ), ( "D", 17) ).toDF("key" ,"qty")

// Some manipulation of data first.
val df2 = df.withColumn("accum_qty", col("qty").cast("long")).drop("qty") // If not already Long.
val rddWithZip = df2.rdd.zipWithIndex
val newSchema = StructType(df2.schema.fields ++ Array(StructField("rowid", LongType, false)))
val df3 =  spark.createDataFrame(rddWithZip.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

// 2. Get last Row in Dataframe and the accum_qty from that. 2 approaches shown, latter simpler.
// val max_accum_qty = df2.reduce { (a, b) => if (a.getAs[Long]("accum_qty") > b.getAs[Long]("accum_qty")) a else b 
//                               } match {case Row(key:String,accum_qty:Long) => (accum_qty)}
val max_accum_qty = df2.select(max("accum_qty")).as[Long].head()

// 3. Make a range DF. A helper table.
val dfRn = spark.range(max_accum_qty).withColumn("lkp", col("id") + 1).drop("id")

// 4. Now we have base data and need to restrict joining later, with some extra data. If Serialization error, add transient. I am using a Notebook, so may not alwas show up. No natural partition by possibility!
val w = org.apache.spark.sql.expressions.Window.orderBy("rowid")  
import org.apache.spark.sql.functions.lag
val df4 = df3.withColumn("accum_qty_prev_temp", lag("accum_qty", 1, 0).over(w)).withColumn("accum_qty_prev", col("accum_qty_prev_temp") + 1).drop("accum_qty_prev_temp", "rowid")

// 5. Actual JOIN. Bad performance and may only work with small volumes, e.g. OOM errors.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

val res = df4.join(dfRn, (  df4("accum_qty_prev") <= dfRn("lkp") && df4("accum_qty") >= dfRn("lkp" )),"inner")   //.explain(false)
res.show(false)

// 6. Add your own select clause...
thebluephantom
  • 16,458
  • 8
  • 40
  • 83