0

The follow DataFrame should be filtered based on the flag column. If the group based on columns id and cod doesn't have any row with value different of None, it's necessary to maintain just a unique row, otherwise, it's necessary to remove the row with None value in column flag.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number,max

spark = SparkSession.builder.appName('Vazio').getOrCreate()

data = [('1', 10, 'A'),
('1', 10, 'A'),
('1', 10, None),
('1', 15, 'A'),
('1', 15, None),
('2', 11, 'A'),
('2', 11, 'C'),
('2', 12, 'B'),
('2', 12, 'B'),
('2', 12, 'C'),
('2', 12, 'C'),
('2', 13, None),
('3', 14, None),
('3', 14, None),
('3', 15, None),
('4', 21, 'A'),
('4', 21, 'B'),
('4', 21, 'C'),
('4', 21, 'C')]

df = spark.createDataFrame(data=data, schema = ['id', 'cod','flag'])
df.show()

How could I obtain the next DataFrame based on last one using PySpark?

+---+---+----+
| id|cod|flag|
+---+---+----+
|  1| 10|   A|
|  1| 15|   A|
|  2| 11|   A|
|  2| 11|   C|
|  2| 12|   B|
|  2| 12|   C|
|  2| 13|null|
|  3| 14|null|
|  3| 15|null|
|  4| 21|   A|
|  4| 21|   C|
+---+---+----+
ZygD
  • 22,092
  • 39
  • 79
  • 102
Pedro Henrique
  • 168
  • 1
  • 2
  • 8
  • 2
    Do you have a pandas or pyspark dataframe? – Corralien Jun 30 '22 at 19:38
  • Pyspark, I did with pandas because I do not have spark in this computer yet. – Pedro Henrique Jun 30 '22 at 19:44
  • Input: `('4', 21, 'A')`, `('4', 21, 'B')`, `('4', 21, 'C')`, `('4', 21, 'C')`. In the output you only have `('4', 21, 'A')` and `('4', 21, 'C')`. Is it a mistake in Output? Why `('4', 21, 'B')` is removed? – ZygD Aug 08 '22 at 04:02

3 Answers3

0

Answer to the edited question:

df = df.groupBy('id', 'cod').agg(F.collect_set('flag').alias('flag'))
df = df.withColumn(
    'flag',
    F.explode(F.when(F.size('flag') != 0, F.col('flag')).otherwise(F.array(F.lit(None))))
)
df.show()
# +---+---+----+
# | id|cod|flag|
# +---+---+----+
# |  2| 11|   C|
# |  2| 11|   A|
# |  2| 12|   C|
# |  2| 12|   B|
# |  1| 15|   A|
# |  1| 10|   A|
# |  3| 14|null|
# |  4| 21|   C|
# |  4| 21|   B|
# |  4| 21|   A|
# |  2| 13|null|
# |  3| 15|null|
# +---+---+----+

Note that the group (4, 21) contains all the 3 flags A, B, C. In the description of the question, you haven't mentioned leaving just 2 values, but in the example somehow the B was deleted, so I assumed it as a mistake.


Answer to the original question:

df = df.groupBy('id', 'cod').agg(F.max('flag').alias('flag'))

You couldn't just remove duplicates based on columns id and cod, as there's no guarantee that you will always get a value from column flag which is not null.

ZygD
  • 22,092
  • 39
  • 79
  • 102
0

this could be a solution

new_df = pd.DataFrame(columns=df.columns)

for index, row in df.iterrows():
    if row.values.tolist() not in new_df.values.tolist():

        if row["flag"] is None and row.values.tolist()[:2] in new_df[list(new_df.columns)[:2]].values.tolist():
            continue

        new_df.loc[-1] = row.values.tolist()
        new_df.index += 1

at the end you could just add

df = new_df.copy(deep=True)
del new_df
0

With PySpark. Adapted from this answer (Spark)

window = Window.partitionBy(['id', 'cod']).orderBy(col('flag').desc())
out = (df.withColumn('row',  row_number().over(window))
         .filter(col('row') == 1).drop('row'))
out.show()

# Output
+---+---+----+
| id|cod|flag|
+---+---+----+
|  2| 11|   A|
|  2| 12|   B|
|  1| 15|   A|
|  3| 14|null|
|  2| 13|null|
|  3| 15|null|
|  1| 10|   A|
+---+---+----+

Setup

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

data = [['1', 10, 'A'],
        ['1', 10, 'A'],
        ['1', 10, None],
        ['1', 15, 'A'],
        ['1', 15, None],
        ['2', 11, 'A'],
        ['2', 12, 'B'],
        ['2', 12, 'B'],
        ['2', 13, None],
        ['3', 14, None],
        ['3', 14, None],
        ['3', 15, None]]

columns = ['id', 'cod', 'flag']
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(data = data, schema = columns)
df.show()

# Output
+---+---+----+
| id|cod|flag|
+---+---+----+
|  1| 10|   A|
|  1| 10|   A|
|  1| 10|null|
|  1| 15|   A|
|  1| 15|null|
|  2| 11|   A|
|  2| 12|   B|
|  2| 12|   B|
|  2| 13|null|
|  3| 14|null|
|  3| 14|null|
|  3| 15|null|
+---+---+----+
Corralien
  • 109,409
  • 8
  • 28
  • 52