2

I am new to Pyspark. By using the below dataframe how can I divide it to two different dataframe based on the "_Value" field. If the _Value which is array(string) is having any null or blank elements in it, then it should go to one dataframe and the rest into another.

+----+-----+-----+-------------------------------+-------------+--------------+
|key |Size |Color|AdditionalAttributeMetric      |_Name        |_Value        |
+----+-----+-----+-------------------------------+-------------+--------------+
|123k|BLACK|black|[Size -> BLACK, Color -> black]|[Size, Color]|[BLACK, black]|
|123k|WHITE|null |[Size -> WHITE, Color ->]      |[Size, Color]|[WHITE,]      |
+----+-----+-----+-------------------------------+-------------+--------------+

Below is the complete code, but its throwing an error "Column is not iterable".

from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import lit, col, create_map
from itertools import chain

rdd = sc.parallelize([('123k', 'BLACK', 'black'),
                      ('123k', 'WHITE', None)
                          ])

schema = StructType([StructField('key', StringType(), True),
                     StructField('Size', StringType(), True),
                     StructField('Color', StringType(), True)])

df_struct = sqlContext.createDataFrame(rdd, schema)

df_struct_subattri = df_struct.select("Size", "Color")

AdditionalAttributeMetric = create_map(
            list(chain(*((lit(name), col(name)) for name in df_struct_subattri.columns)))).alias(
            "AdditionalAttributeMetric")

df_struct = df_struct.select("*", AdditionalAttributeMetric)

df_struct = df_struct.withColumn("_Name", map_keys(col("AdditionalAttributeMetric")))
df_struct = df_struct.withColumn("_Value", map_values(col("AdditionalAttributeMetric")))


df_struct1 = df_struct.select("*").where(array_contains (col("_Value"), '') | array_contains (col("_Value"), lit(None)))

df_struct1.show(truncate = False)

Any kind of help is appreciated.

Steven
  • 14,048
  • 6
  • 38
  • 73
Tapash Dash
  • 45
  • 1
  • 7
  • Hello your question may be a duplicate of this one : https://stackoverflow.com/questions/31669308/how-to-split-a-dataframe-into-dataframes-with-same-column-values – Ahlam AIS Mar 27 '20 at 09:52

1 Answers1

2

Sample data has 1 clean row, 1 row with None, 1 row with ''.

from pyspark.sql import function as F
df_struct.show()

+----+--------------+
| key|        _value|
+----+--------------+
|123k|[BLACK, black]|
|123k|      [WHITE,]|
|124k|      [BLUE, ]|
+----+--------------+

If you do not have spark2.4, you can use array_contains to check for empty string. Doing this if any row has null in it, the output for array_contains will be null, or if it has empty string "" in it, output will be true. You can then filter on that new boolean column as shown below.

df.withColumn("boolean", F.array_contains("_value", ""))\
  .filter(~((F.col("boolean")==True) | (F.col("boolean").isNull()))).drop("boolean").show()

+----+--------------+
| key|        _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+

You can remove ~ to get all other rows with Nones or empty strings.

df.withColumn("boolean", F.array_contains("_value", ""))\
  .filter(((F.col("boolean")==True) | (F.col("boolean").isNull()))).drop("boolean").show()

+----+--------+
| key|  _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+

Spark2.4:

You can use higher order function array filter to take out None or '', and then compare size in the dataframe filter.

df_struct.withColumn("_value2", F.expr("""filter(_value, x-> x is not null and x!='')"""))\
  .filter((F.size("_value2")==F.size("_value"))).drop("_value2").show()

+----+--------------+
| key|        _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+

To get the other rows, which have Nones or '' or both. You can put ~ in front of filter expression.

df_struct.withColumn("_value2", F.expr("""filter(_value, x-> x is not null and x!='')"""))\
  .filter(~(F.size("_value2")==F.size("_value"))).drop("_value2").show()

+----+--------+
| key|  _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+

You can also use higher order function exists.

df.withColumn("boolean", F.expr("""exists(_value, x-> x is null or x=='')"""))\
  .filter(~(F.col("boolean")==True)).drop("boolean").show()

+----+--------------+
| key|        _value|
+----+--------------+
|123k|[BLACK, black]|
+----+--------------+

Remove ~ to get all rows with Nones or "":

df.withColumn("boolean", F.expr("""exists(_value, x-> x is null or x=='')"""))\
  .filter((F.col("boolean")==True)).drop("boolean").show()

+----+--------+
| key|  _value|
+----+--------+
|123k|[WHITE,]|
|124k|[BLUE, ]|
+----+--------+
murtihash
  • 8,030
  • 1
  • 14
  • 26