0

I have a file which has multiple values for each phone number. for example :

phone_no circle operator priority1 attribute1 attribute2 attribute3 priority2 attribute1 attribute2 attribute3 
123445   delhi  airtel   1.0        info1      info2      info3      1.1        info4      info5      info6
987654   bhopal idea     1.1        info1      info2      info3      1.4        info4      info5      info6
123445   delhi  airtel   1.3        info1      info2      info3      1.0        info4      info5      info6

What my expected output is : for each phone number select minimum P1 and it's corresponding attribute values.

As my above example states that for phone number 123445 has P1 in line 1 which is lesser than P1 in line 3 (1.0 < 1.3) so I want to select attributes 1,2 and 3 from line 1 and as P2 in line 3 has lesser value ( 1.0 < 1.1) so I want to select attribute values from line 3.

Here is what I want in a tabular format:

phone_no circle operator priority1 attribute1 attribute2 attribute3 priority2 attribute1 attribute2 attribute3 
123445   delhi  airtel   1.0        info1      info2      info3      1.0        info4      info5      info6
987654   bhopal idea     1.1        info1      info2      info3      1.4        info4      info5      info6

I have 25 different priority values and each priority value has 4 different attributes so my total columns are around 125.

What I have tried so far :

  1. Create a Dataframe which has a phone number as a key and minimum of each priority value.
  2. Create another Dataframe which has values of min(Priority1) along with it's corresponding attributes for each phone number.
  3. Create another Dataframe which has values of min(Priority2) along with it's corresponding attributes for each phone number.
  4. Join these two Dataframes on Phone number to get the complete information and save this Dataframe to disk.

The problem with my approach is that it is not a good approach considering the amount of columns that I have. Please suggest me some good approach for this problem.

EDIT 1 : Here is the pastebin link of what I have done : https://pastebin.com/ps4f1KSh

Rishabh Dixit
  • 115
  • 4
  • 16

1 Answers1

1

I would probably use window functions:

from pyspark.sql.window import Window
import pyspark.sql.functions as spf

df = spark.createDataFrame([
    (123, 1, 'a', 2, 'c'),
    (123, 2, 'b', 1, 'd'),
    (456, 3, 'e', 4, 'f')
], ['phone', 'priority1', 'attribute1', 'priority2', 'attribute2'])

w = Window.partitionBy('phone')
df2 = (
    df
    .select(
        'phone',
        spf.first('attribute1').over(w.orderBy('priority1')).alias('attribute1'),
        spf.first('attribute2').over(w.orderBy('priority2')).alias('attribute2'),
    )
)

(
    df2
    .groupby('phone')
    .agg(*[spf.first(c).alias(c) for c in df2.columns if c != 'phone'])
    .toPandas()
)

Gives:

   phone attribute1 attribute2
0    123          a          d
1    456          e          f

It's an exercise for the reader to template this out (e.g. using list comprehensions) to generalize to all attributes and priorities.

santon
  • 4,395
  • 1
  • 24
  • 43
  • I will try it out on Monday and get back to you. Thank you for your time ! – Rishabh Dixit Nov 03 '18 at 15:39
  • This works perfectly ! Thank you so much. Actually my initial problem was https://stackoverflow.com/questions/53001909/hive-how-to-update-the-existing-data-if-it-exists-based-on-some-condition-and-in . I had an older version of HIVE so I could not use `merge` command . So I thought to create a file with consolidated records and then use HIVE join query to reach the end result. I have accepted your solution as the answer. – Rishabh Dixit Nov 12 '18 at 08:13