0

Everyday I'm receiving file with ~2k columns. There is 900 "relationship" columns. For example:

    data.id | name | AGE |data.rel.1 | data.rel.2 | data.rel.1.type | data.rel.2.type
    12      | JOE  | 25  | ASDF      | QWER       | order           | order
    23      | TIM  | 20  | AAAA      | SSSS       | product         | product
    34      | BRAD | 32  | XXXX      | null       | order           | null
    11      | MATT | 23  | ASDF      | QWER       | agreement       | agreement

I need to flatten data and create "id - rel - rel type" dataframe which would contain only data.id, data.rel and data.rel.type

    data.id | data.rel | data.rel.type
    12      | ASDF     | order   
    12      | QWER     | order        
    23      | AAAA     | product    
    23      | SSSS     | product     
    34      | XXXX     | order   
    11      | ASDF     | agreement   
    11      | QWER     | agreement

This solution seems to be working with one column, however I'm not sure how incorporate rel.type column into the same logic:

   pattern = '/*rel/*'     

   def explode(row,pattern):
       for c in row.asDict():
           if re.search(pattern, c):
               yield (row['data_id'],row[c])


    df.rdd.flatMap(lambda r:explode(r,pattern))
             .toDF(['data_id','data_rel'])
             .filter(F.col('data_rel').isNotNull())
             .show()

Any ideas?

  • i was just thinking since i dont know python much I could not able to put answer here.. first select data.id and data.rel.1 as df1 similarly data.id and data.rel.2 as df2 and data.id and data.rel.3 as df3 now you have 3 dataframes then union them you will get above output – Ram Ghadiyaram May 13 '19 at 15:52
  • @RamGhadiyaram that was my initial idea but it's extremely inefficient. Thank you. – Adas Kavaliauskas May 13 '19 at 16:14
  • 1
    union is not inefficient at all ... it avoids a shuffle and since your output is only 2 cols it will stack them all together. – thePurplePython May 13 '19 at 17:09
  • Possible duplicate of [How to melt Spark DataFrame?](https://stackoverflow.com/questions/41670103/how-to-melt-spark-dataframe) – user10938362 May 13 '19 at 18:17

2 Answers2

3

Here is a solution

import pyspark.sql.functions as F

df = spark.createDataFrame(
    [(12, 'JOE', 25, 'ASDF', 'QWER', 'ZXCV'),
    (23, 'TIM', 20, 'AAAA', 'SSSS', 'DDDD'),
    (34, 'BRAD', 32, 'XXXX', None, None),
    (11, 'MATT', 23, 'ASDF', 'QWER', None)],
    ['data_id','name','AGE','data_rel_1','data_rel_2','data_rel_3']
)

# Create an array of the columns you want
cols = F.array(
    *[F.col(c).alias(c) for c in ['data_rel_1', 'data_rel_2', 'data_rel_3']]
)

df.withColumn(
    "data_rel", cols
).select(
    'data_id',F.explode('data_rel').alias('data_rel')
).filter(
    F.col('data_rel').isNotNull()
).show()

which results in:

+-------+--------+
|data_id|data_rel|
+-------+--------+
|     12|    ASDF|
|     12|    QWER|
|     12|    ZXCV|
|     23|    AAAA|
|     23|    SSSS|
|     23|    DDDD|
|     34|    XXXX|
|     11|    ASDF|
|     11|    QWER|
+-------+--------+

EDIT Another solution using rdd and also explode can take pattern as a param(This may not result in exceptions with more cols)

import pyspark.sql.functions as F

#takes pattern, and explodes those cols which match pattern
def explode(row,pattern):
    import re
    for c in row.asDict():
        if re.search(pattern, c):
            yield (row['data_id'],row[c])

df = spark.createDataFrame(
    [(12, 'JOE', 25, 'ASDF', 'QWER', 'ZXCV'),
    (23, 'TIM', 20, 'AAAA', 'SSSS', 'DDDD'),
    (34, 'BRAD', 32, 'XXXX', None, None),
    (11, 'MATT', 23, 'ASDF', 'QWER', None)],['data_id','name','AGE','data_rel_1','data_rel_2','data_rel_3']
)
pattern = '/*rel/*'
df.rdd.flatMap(
    lambda r:explode(r,pattern)
).toDF(
    ['data_id','data_rel']
).filter(
    F.col('data_rel').isNotNull()
).show()
Ranga Vure
  • 1,922
  • 3
  • 16
  • 23
  • Thank you guys, I will try this solution with explode function and will let you know how it work with my data. – Adas Kavaliauskas May 13 '19 at 17:51
  • You can skip the `withColumn` step and directly `select("\`data.id\`", F.explode(cols))` – pault May 13 '19 at 18:18
  • @pault I'm getting StackOverflowError, tried to increase memory, but it didn't help. Updated initial question with details. – Adas Kavaliauskas May 14 '19 at 17:08
  • 1
    @AdasKavaliauskas `explode` is an expensive operation. You may be better off using the `union` method. – pault May 14 '19 at 17:52
  • @AdasKavaliauskas try with rdd approach (i edited with another solution) – Ranga Vure May 14 '19 at 18:42
  • @RangaVure Thank you so much. rdd.Flatmap solution seems to be working, however i'm not sure how to pass pattern parameter into functions. Updated question. – Adas Kavaliauskas May 16 '19 at 12:35
  • @AdasKavaliauskas update with generic method without passing param. And also if it solves please accept my solution by clicking tick mark. – Ranga Vure May 16 '19 at 13:28
  • @RangaVure Sometimes I have multiple rel groups and i need to flatten them separately. For example: data.rel.1.product, data.rel.2.category... That's why I am thinking about pattern as parameter, any ideas? Thank you. – Adas Kavaliauskas May 16 '19 at 13:45
  • @AdasKavaliauskas updated with pattern as param. Hope this is complete solution for you :) – Ranga Vure May 16 '19 at 16:20
  • @RangaVure Thank you. Works like a charm. I'm still thinking about one nice to have feature. Would it be possible to add another column in this solution for data type? If I have multiple columns, for example "data.rel.1.product" and "data.rel.1.product.type" and so on.. I am thinking about creating tuples of these columns and then iteration through them, but I'm not sure whether it's good logic or how to achieve this? – Adas Kavaliauskas May 17 '19 at 08:51
0

Dont know python much I could not able to put answer here.. wrote in scala. you can try to translate to python. - first select data.id and data.rel.1 as df1 similarly data.id and data.rel.2 as df2 and data.id and data.rel.3 as df3

Now you have 3 dataframes then union them you will get above output

import org.apache.spark.sql.{ SparkSession}

/**
  * Created by Ram Ghadiyaram
  */
object DFUnionExample {

  def main(args: Array[String]) {

    val sparkSession = SparkSession.builder.
      master("local")
      .appName("DFUnionExample")
      .getOrCreate()

    import sparkSession.implicits._

    val basedf = Seq((12, "JOE", 25, "ASDF", "QWER", "ZXCV"),
      (23, "TIM", 20, "AAAA", "SSSS", "DDDD"),
      (34, "BRAD", 32, "XXXX", null, null),
      (11, "MATT", 23, "ASDF", "QWER", null)
    ).toDF("data.id", "name", "AGE", "data.rel.one", "data.rel.two", "data.rel.three")
    basedf.show
    import org.apache.spark.sql.functions._
     val df1 =   basedf.select(col("`data.id`"),col("`data.rel.one`"))
        val df2 =basedf.select(col("`data.id`"),col("`data.rel.two`"))
        val df3 =   basedf.select(col("`data.id`"),col("`data.rel.three`"))
        df1.union(df2).union(df3)
          .select(col("`data.id`"),col("`data.rel.one`").as("data.rel"))
          .filter(col("`data.rel`").isNotNull)
          .sort(col("`data.id`")).show
  }
}

Result:

+-------+----+---+------------+------------+--------------+
|data.id|name|AGE|data.rel.one|data.rel.two|data.rel.three|
+-------+----+---+------------+------------+--------------+
|     12| JOE| 25|        ASDF|        QWER|          ZXCV|
|     23| TIM| 20|        AAAA|        SSSS|          DDDD|
|     34|BRAD| 32|        XXXX|        null|          null|
|     11|MATT| 23|        ASDF|        QWER|          null|
+-------+----+---+------------+------------+--------------+

+-------+--------+
|data.id|data.rel|
+-------+--------+
|     11|    QWER|
|     11|    ASDF|
|     12|    ASDF|
|     12|    QWER|
|     12|    ZXCV|
|     23|    AAAA|
|     23|    DDDD|
|     23|    SSSS|
|     34|    XXXX|
+-------+--------+
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121