I am new to Pyspark. By using the below dataframe how can I divide it to two different dataframe based on the "_Value" field. If the _Value which is array(string) is having any null or blank elements in it, then it should go to one dataframe and the rest into another.
+----+-----+-----+-------------------------------+-------------+--------------+
|key |Size |Color|AdditionalAttributeMetric |_Name |_Value |
+----+-----+-----+-------------------------------+-------------+--------------+
|123k|BLACK|black|[Size -> BLACK, Color -> black]|[Size, Color]|[BLACK, black]|
|123k|WHITE|null |[Size -> WHITE, Color ->] |[Size, Color]|[WHITE,] |
+----+-----+-----+-------------------------------+-------------+--------------+
Below is the complete code, but its throwing an error "Column is not iterable".
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import lit, col, create_map
from itertools import chain
rdd = sc.parallelize([('123k', 'BLACK', 'black'),
('123k', 'WHITE', None)
])
schema = StructType([StructField('key', StringType(), True),
StructField('Size', StringType(), True),
StructField('Color', StringType(), True)])
df_struct = sqlContext.createDataFrame(rdd, schema)
df_struct_subattri = df_struct.select("Size", "Color")
AdditionalAttributeMetric = create_map(
list(chain(*((lit(name), col(name)) for name in df_struct_subattri.columns)))).alias(
"AdditionalAttributeMetric")
df_struct = df_struct.select("*", AdditionalAttributeMetric)
df_struct = df_struct.withColumn("_Name", map_keys(col("AdditionalAttributeMetric")))
df_struct = df_struct.withColumn("_Value", map_values(col("AdditionalAttributeMetric")))
df_struct1 = df_struct.select("*").where(array_contains (col("_Value"), '') | array_contains (col("_Value"), lit(None)))
df_struct1.show(truncate = False)
Any kind of help is appreciated.