To use Lag and a Window with .NET for Apache Spark you are very close and would need:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Lag(df["Value"], 1).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);
This would result in:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| null|
|2021| 3| 4| 8| 12|null| null| null| null|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
but you probably want Last
instead of Lag
as you can skip nulls:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Last(df["Value"], true).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);
Which results in:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| 380.5|
|2021| 3| 4| 8| 12|null| null| null| 380.5|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
hope it helps!
ed
(the using statements needed to make this work)
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;