0

I'm pretty new to pySpark and I am trying to work with some meter data from an electric meter interval dataset (csv) I have access to.

I have a dataframe schema created from a CSV import of electric meter data that looks something like this:

root
 |-- _c0: string (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: long (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: long (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: double (nullable = true)
 |-- _c17: long (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: double (nullable = true)
 |-- _c20: long (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: double (nullable = true)
 |-- _c23: long (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: double (nullable = true)
 |-- _c26: long (nullable = true)
 |-- _c27: string (nullable = true)
 |-- _c28: double (nullable = true)
 ...

_c13 contains a number which indicates how many groupings of 3 data columns follow it. (Representing a timestamp, a flag, and a value) varies from 1 to 96 on each record

(i.e.    timestamp, flag, value)
_c14, _c15, _c16 = Group 1   (202010101315, "NONE", 1.1)
_c17, _c18, _c19 = Group 2   (202010101330, "NONE", 1.2)
_c20, _c21, _c22 = Group 3   (202010101345, "EST", 0.75) etc...

My final output that I believe I want for a AWS Redshift table is a single row for each grouping from the source dataframe _c0 to _c12 would be output with each grouping

+-----+----+     +------+-------------+------+-------+
  _c0   _c1  ... | _c12 | timestamp   | flag | value |
+-----+----+     +------+-------------+------+-------+
   A     B    ...   L    202010101315  NONE    1.1
   A     B    ...   L    202010101330  NONE    1.2
   A     B    ...   L    202010101345  EST     0.75
   etc...

So far, I have managed to load my data into a dataframe. To iterate over each row, I realized I can create an RDD with a custom function to perform manipulation on the row:

rdd = df.rdd.map(customFunction)

but I quickly realized that I can only return a single grouping to the RDD

Then, I looked at appending a row to a new dataframe from within the customFunction, but after reading that the dataframe is immutable and a new dataframe is returned with each append, I realized that that is probably not that efficient.

Any help on a basic structure to achieve the split of the recordsI'm looking for efficiently would be appreciated!

Ken
  • 3
  • 3
  • If I understand well you want to do something like this for each triplet from c_14 to c_96: https://stackoverflow.com/a/42723968/7306659 – ggagliano Oct 27 '20 at 01:29

1 Answers1

0

Basically, your are looking for the explode function that allows to create several rows from an array in one row. You also need to create that array from the columns you want.

However, I do not know of any way in sparkSQL to create an array of a variable number of columns, whose size is parametrized by another column. In RDD, you could do it easily and use flatMap. In SparkSQL, what we could do is create an array with all the possible columns, and filter those you do not want afterwards.

Let's assume that the list of columns of your dataframe starts with the columns you want to keep, followed by the column containing the number of groups (_c13 in your case) and then followed by the columns containing the groups. In pyspark the code could look like this:

# the index of the column containing the number of groups
group_column = 13
# the maximum number of groups, 96 if I understand correctly
max_group_count = 96

df\
  .withColumn("groups", 
       F.array([
           F.struct(
                F.col("_c"+str(group_column+1+3*i)).alias("timestamp"),
                F.col("_c"+str(group_column+2+3*i)).alias("flag"), 
                F.col("_c"+str(group_column+3+3*i)).alias("value"), 
                F.lit(i).alias("index")
           ) for i in range(max_group_count)
       ])
  )\
  .drop(*[
     "_c"+str(i) for i in range(group_column+1, group_column+3*max_group_count+1)
  ])\
  .withColumn("s", F.explode("groups"))\
  .where(F.col("_c"+str(group_column)) > F.col("s.index"))\
  .select([F.col("_c"+str(i)) for i in range(group_column+1)] + ["s.*"])\
  .drop("index")

Detailed explanation of the code:

The groups column is the array I was talking about. It contains all the possible groups, each of which is contained in a struct with an index ranging from 0 to the maximum number of groups. Then I drop the group columns to avoid exploding them. Then I explode the array, which create one line per group. Then, the where command gets rid of the groups you do not want, using the index we added to the struct, and the number of groups in column _c13. Finally, we put everything is the right shape: select the column we want to keep (the ones before the column with the number of groups) and extract the columns from the struct using the wildcard *.

Oli
  • 9,766
  • 5
  • 25
  • 46