I have to create data lineage for group relation . so below u can see source and target. for example 1 is having relation with A and A is also related to 2, 2 also has C , 1 also have D. So 1 has relation to all aand result is ABCD in answer .
I wrote the code but thats not standard way to write it. Cant i create DF and iterate.
source
id,group
1,A
2,A
1,B
3,B
2,C
3,C
1,D
4,D
5,D
6,E
7,E
8,F
6,F
9,G
Target
+---+-----+----+
| id|group| relation|
+---+-----+----+
| 7| E| EF|
| 3| B|ABCD|
| 8| F| EF|
| 5| D|ABCD|
| 6| E| EF|
| 9| G| G|
| 1| A|ABCD|
| 4| D|ABCD|
| 2| A|ABCD|
+---+-----+----+
Code used
from pyspark.sql.functions import lit,Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Row
import sys
import datetime
import json
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.window import *
from pyspark.sql.functions import row_number
schema = StructType( [StructField("id", StringType(), True),StructField("group", StringType(), True)])
rdd = sc.parallelize( [Row("1", "A"), Row("2", "A"), Row("1", "B"), Row("3", "B"),Row("2", "C"), Row("3", "C"),Row("1", "D"), Row("4", "D"),Row("5", "D"), Row("6", "E"), Row("7", "E"), Row("8", "F"),Row("6","F"),Row("9","G")] )
#create dummy df
df = sqlContext.createDataFrame(rdd, schema)
final_result = []
# pick first row of each id
df4 = df.withColumn("row_num", row_number().over(Window.partitionBy("id").orderBy("group"))).where(F.col("row_num")==1).drop("row_num")
data = [(part["id"],part["group"]) for part in df4.collect()]
# iterate on data to find relation
for key,value in data:
itid= [key]
doneid= []
itgroup= [value]
donegroup= []
while ((len(list(set(itid) - set(doneid)))>0) | (len(list(set(itgroup) - set(donegroup))))>0):
for id in list(set(itid) - set(doneid)):
itgroup.extend([part["group"] for part in df.select("group").where(df["id"]==id).collect()])
doneid.extend(id)
for group in list(set(itgroup) - set(donegroup)):
itid.extend([part["id"] for part in df.select("id").where(df["group"]==group).collect()])
donegroup.extend(group)
res=''.join(sorted(donegroup))
if len(res)>0:
#append in final list
final_result.append([key,value,res])
cSchema = StructType([StructField("id", StringType()),StructField("group", StringType()),StructField("id", StringType())])
result = spark.createDataFrame(final_result,schema=cSchema)
# created final df and showed
result.show()
This work and gives correct result
+---+-----+----+
| id|group| id|
+---+-----+----+
| 7| E| EF|
| 3| B|ABCD|
| 8| F| EF|
| 5| D|ABCD|
| 6| E| EF|
| 9| G| G|
| 1| A|ABCD|
| 4| D|ABCD|
| 2| A|ABCD|
+---+-----+----+
I want to know the best way to write this code. I tried UDF but that is saying you cant pass dataframe in UDF.
Cant i create DF and iterate. get the data df
def getdata():
schema = StructType( [StructField("id", StringType(), True),StructField("group", StringType(), True)])
rdd = sc.parallelize( [Row("1", "A"), Row("2", "A"), Row("1", "B"), Row("3", "B"),Row("2", "C"), Row("3", "C"),Row("1", "D"), Row("4","D"),Row("5", "D"), Row("6", "E"), Row("7", "E"), Row("8", "F"),Row("6","F"),Row("9","G")] )
df = sqlContext.createDataFrame(rdd, schema)
df.persist()
return df
function to pull relation string from 1 input row for example (1,"A")
def getMember(key,value):
df = getdata()
itid= [key]
doneid= []
itgroup= [value]
donegroup= []
while ((len(list(set(itid) - set(doneid)))>0) | (len(list(set(itgroup) - set(donegroup))))>0):
for id in list(set(itid) - set(doneid)):
itgroup.extend([part["group"] for part in df.select("group").where(df["id"]==id).collect()])
doneid.extend(id)
for group in list(set(itgroup) - set(donegroup)):
itid.extend([part["id"] for part in df.select("id").where(df["group"]==group).collect()])
donegroup.extend(group)
return ''.join(sorted(donegroup
))
udf_getMember = udf( getMember, F.StringType())
schema = StructType( [StructField("id", StringType(), True),StructField("group", StringType(), True)])
rdd = sc.parallelize( [Row("1", "A"), Row("2", "A"), Row("1", "B"), Row("3", "B"),Row("2", "C"), Row("3", "C"),Row("1", "D"), Row("4", "D"),Row("5", "D"), Row("6", "E"), Row("7", "E"), Row("8", "F"),Row("6","F"),Row("9","G")] )
df3 = sqlContext.createDataFrame(rdd, schema)
df4 = df3.withColumn("row_num", row_number().over(Window.partitionBy("id").orderBy("group"))).where(F.col("row_num")==1).drop("row_num")
df4.withColumn('result', udf_getMember(F.col("id"),F.col("group")))
This does not work and gives me pickle error