1

Trying to figure this out pro-grammatically ... seems like a difficult problem ... basically if sensor item is not captured in time-series timestamp interval source data then want to append a row for each missing sensor item with a NULL value per timestamp window

# list of sensor items [have 300 plus; only showing 4 as example]
list = ["temp", "pressure", "vacuum", "burner"]

# sample data
df = spark.createDataFrame([('2019-05-10 7:30:05', 'temp', '99'),\
                            ('2019-05-10 7:30:05', 'burner', 'TRUE'),\
                            ('2019-05-10 7:30:10', 'vacuum', '.15'),\
                            ('2019-05-10 7:30:10', 'burner', 'FALSE'),\
                            ('2019-05-10 7:30:10', 'temp', '75'),\
                            ('2019-05-10 7:30:15', 'temp', '77'),\
                            ('2019-05-10 7:30:20', 'pressure', '.22'),\
                            ('2019-05-10 7:30:20', 'temp', '101'),], ["date", "item", "value"])
# current dilemma => all sensor items are not being captured / only updates to sensors are being captured in current back-end design streaming devices
+------------------+--------+-----+
|              date|    item|value|
+------------------+--------+-----+
|2019-05-10 7:30:05|    temp|   99|
|2019-05-10 7:30:05|  burner| TRUE|

|2019-05-10 7:30:10|  vacuum|  .15|
|2019-05-10 7:30:10|  burner|FALSE|
|2019-05-10 7:30:10|    temp|   75|

|2019-05-10 7:30:15|    temp|   77|

|2019-05-10 7:30:20|pressure|  .22|
|2019-05-10 7:30:20|    temp|  101|
+------------------+--------+-----+

Want to capture every sensor item per timestamp so forward filling imputing can performed prior to pivoting data-frame [forward filling on 300 plus cols is causing scala errors =>

Spark Caused by: java.lang.StackOverflowError Window Function?

# desired output
+------------------+--------+-----+
|              date|    item|value|
+------------------+--------+-----+
|2019-05-10 7:30:05|    temp|   99|
|2019-05-10 7:30:05|  burner| TRUE|
|2019-05-10 7:30:05|  vacuum| NULL|
|2019-05-10 7:30:05|pressure| NULL|

|2019-05-10 7:30:10|  vacuum|  .15|
|2019-05-10 7:30:10|  burner|FALSE|
|2019-05-10 7:30:10|    temp|   75|
|2019-05-10 7:30:10|pressure| NULL|

|2019-05-10 7:30:15|    temp|   77|
|2019-05-10 7:30:15|pressure| NULL|
|2019-05-10 7:30:15|  burner| NULL|
|2019-05-10 7:30:15|  vacuum| NULL|

|2019-05-10 7:30:20|pressure|  .22|
|2019-05-10 7:30:20|    temp|  101|
|2019-05-10 7:30:20|  vacuum| NULL|
|2019-05-10 7:30:20|  burner| NULL|
+------------------+--------+-----+
thePurplePython
  • 2,621
  • 1
  • 13
  • 34
  • Here's an idea (not sure how efficient). Create a temp dataframe as with the crossJoin of the distinct dates and dataframe of all of your items: `df.select('date').distinct().crossJoin(broadcast(spark.createDataFrame([('temp',), ('burner',), ('vacuum',), ('pressure',)], ["item"])))`. Now do a right join of your original dataframe to this temp one – pault Jun 06 '19 at 20:08

1 Answers1

3

Expanding on my comment:

You can right join your DataFrame with the Cartesian product of the distinct dates and the sensor_list. Since the sensor_list is small, you can broadcast it.

from pyspark.sql.functions import broadcast

sensor_list = ["temp", "pressure", "vacuum", "burner"]

df.join(
    df.select('date')\
        .distinct()\
        .crossJoin(broadcast(spark.createDataFrame([(x,) for x in sensor_list], ["item"]))),
    on=["date", "item"],
    how="right"
).sort("date", "item").show()
#+------------------+--------+-----+
#|              date|    item|value|
#+------------------+--------+-----+
#|2019-05-10 7:30:05|  burner| TRUE|
#|2019-05-10 7:30:05|pressure| null|
#|2019-05-10 7:30:05|    temp|   99|
#|2019-05-10 7:30:05|  vacuum| null|
#|2019-05-10 7:30:10|  burner|FALSE|
#|2019-05-10 7:30:10|pressure| null|
#|2019-05-10 7:30:10|    temp|   75|
#|2019-05-10 7:30:10|  vacuum|  .15|
#|2019-05-10 7:30:15|  burner| null|
#|2019-05-10 7:30:15|pressure| null|
#|2019-05-10 7:30:15|    temp|   77|
#|2019-05-10 7:30:15|  vacuum| null|
#|2019-05-10 7:30:20|  burner| null|
#|2019-05-10 7:30:20|pressure|  .22|
#|2019-05-10 7:30:20|    temp|  101|
#|2019-05-10 7:30:20|  vacuum| null|
#+------------------+--------+-----+
pault
  • 41,343
  • 15
  • 107
  • 149
  • this makes a lot of sense! thank you very much! i will give it a shot on the large data set and let you know how it goes. – thePurplePython Jun 06 '19 at 21:46
  • this join method is working out pretty well ... since it is broadcast no shuffle is happening so the performance looks to be feasible. thanks! – thePurplePython Jun 07 '19 at 22:59
  • i sort of spoke too soon ... performance is still ok however ```explain plan``` is showing ```nestedBroadcastLoopJoin``` which I have heard horror stories about ... any insights? – thePurplePython Jun 25 '19 at 20:40