0

I have to create data lineage for group relation . so below u can see source and target. for example 1 is having relation with A and A is also related to 2, 2 also has C , 1 also have D. So 1 has relation to all aand result is ABCD in answer .

I wrote the code but thats not standard way to write it. Cant i create DF and iterate.

source

id,group
1,A
2,A
1,B
3,B
2,C
3,C
1,D
4,D
5,D
6,E
7,E
8,F
6,F
9,G 

Target

+---+-----+----+
| id|group|  relation|
+---+-----+----+
|  7|    E|  EF|
|  3|    B|ABCD|
|  8|    F|  EF|
|  5|    D|ABCD|
|  6|    E|  EF|
|  9|    G|   G|
|  1|    A|ABCD|
|  4|    D|ABCD|
|  2|    A|ABCD|
+---+-----+----+

Code used

from pyspark.sql.functions import lit,Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Row
import sys
import datetime
import json
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.window import *
from pyspark.sql.functions import row_number
schema = StructType( [StructField("id", StringType(), True),StructField("group", StringType(), True)])
rdd = sc.parallelize( [Row("1", "A"), Row("2", "A"), Row("1", "B"), Row("3", "B"),Row("2", "C"), Row("3", "C"),Row("1", "D"), Row("4", "D"),Row("5", "D"), Row("6", "E"), Row("7", "E"), Row("8", "F"),Row("6","F"),Row("9","G")] )  
#create dummy df 
df = sqlContext.createDataFrame(rdd, schema)      
final_result = []     
# pick first row of each id
df4 = df.withColumn("row_num", row_number().over(Window.partitionBy("id").orderBy("group"))).where(F.col("row_num")==1).drop("row_num")
data = [(part["id"],part["group"]) for part in df4.collect()]   
# iterate on data to find relation  
for key,value in data:
    itid= [key]
    doneid= []
    itgroup= [value]
    donegroup= []
    while ((len(list(set(itid) - set(doneid)))>0) | (len(list(set(itgroup) - set(donegroup))))>0):
        for id in list(set(itid) - set(doneid)):
            itgroup.extend([part["group"]  for part in df.select("group").where(df["id"]==id).collect()])
            doneid.extend(id)
        for group in list(set(itgroup) - set(donegroup)):
            itid.extend([part["id"]  for part in df.select("id").where(df["group"]==group).collect()])
            donegroup.extend(group)
    res=''.join(sorted(donegroup))
    if len(res)>0:
        #append in final list
        final_result.append([key,value,res])

cSchema = StructType([StructField("id", StringType()),StructField("group", StringType()),StructField("id", StringType())])
result = spark.createDataFrame(final_result,schema=cSchema) 
# created final df and showed
result.show()

This work and gives correct result

+---+-----+----+
| id|group|  id|
+---+-----+----+
|  7|    E|  EF|
|  3|    B|ABCD|
|  8|    F|  EF|
|  5|    D|ABCD|
|  6|    E|  EF|
|  9|    G|   G|
|  1|    A|ABCD|
|  4|    D|ABCD|
|  2|    A|ABCD|
+---+-----+----+

I want to know the best way to write this code. I tried UDF but that is saying you cant pass dataframe in UDF.

Cant i create DF and iterate. get the data df

def getdata():
    schema = StructType( [StructField("id", StringType(), True),StructField("group", StringType(), True)])
    rdd = sc.parallelize( [Row("1", "A"), Row("2", "A"), Row("1", "B"), Row("3", "B"),Row("2", "C"), Row("3", "C"),Row("1", "D"), Row("4","D"),Row("5", "D"), Row("6", "E"), Row("7", "E"), Row("8", "F"),Row("6","F"),Row("9","G")] )
    df = sqlContext.createDataFrame(rdd, schema)
    df.persist()
    return df

function to pull relation string from 1 input row for example (1,"A")

def getMember(key,value):
        df = getdata()
        itid= [key]
        doneid= []
        itgroup= [value]
        donegroup= []
        while ((len(list(set(itid) - set(doneid)))>0) | (len(list(set(itgroup) - set(donegroup))))>0):
            for id in list(set(itid) - set(doneid)):
                itgroup.extend([part["group"]  for part in df.select("group").where(df["id"]==id).collect()])
                doneid.extend(id)
            for group in list(set(itgroup) - set(donegroup)):
                itid.extend([part["id"]  for part in df.select("id").where(df["group"]==group).collect()])
                donegroup.extend(group)
        return ''.join(sorted(donegroup

))

    udf_getMember = udf( getMember, F.StringType())
    schema = StructType( [StructField("id", StringType(), True),StructField("group", StringType(), True)])
    rdd = sc.parallelize( [Row("1", "A"), Row("2", "A"), Row("1", "B"), Row("3", "B"),Row("2", "C"), Row("3", "C"),Row("1", "D"), Row("4", "D"),Row("5", "D"), Row("6", "E"), Row("7", "E"), Row("8", "F"),Row("6","F"),Row("9","G")] )
    df3 = sqlContext.createDataFrame(rdd, schema)    
    df4 = df3.withColumn("row_num", row_number().over(Window.partitionBy("id").orderBy("group"))).where(F.col("row_num")==1).drop("row_num")
   df4.withColumn('result', udf_getMember(F.col("id"),F.col("group")))

This does not work and gives me pickle error

Ashish Mishra
  • 510
  • 4
  • 18
  • Is the `group` column of your targetdf important? Otherwise you could use [collect_list()](https://stackoverflow.com/a/41789093). – cronoik Aug 11 '19 at 15:25
  • No it's important, actually you can't do it with just collect_list ,because for ID 2 you won't be able to make it ABCD. Also there can be a long chain of many integer and alphabet linking each other. – Ashish Mishra Aug 12 '19 at 02:47

1 Answers1

0

enter image description here You can refine it. But this is can be achieved like this.

lineage_df = (spark.read.format('csv')
              .option('header',True)
              .option('inferSchema',True)
              .option('sep',',')
              .load('./data/data_lieneage.txt')
             )
lineage_df.createOrReplaceTempView('lineage_table')
dick_dick=spark.sql("select A.id as a_id,A.group as a_group, \
                    b.id as b_id,b.group as b_group,\
                    c.id as c_id,c.group as c_group \
                    from lineage_table A inner join lineage_table b on A.id = b.id \
                    left join lineage_table c on a.group = c.group")
dick_dick.createOrReplaceTempView('dick_dick_tbl')
spark.sql("select c_id,collect_list(distinct(b_group)) as collected_list from dick_dick_tbl group by c_id ").show(truncate=False)
ouflak
  • 2,458
  • 10
  • 44
  • 49
Decode D
  • 1
  • 1