1

The piece of code below tries to do the following:

For each customer_code in sdf1, check if this customer code appears in sdf2. If it does, replace the df1.actual_related_customer with the df2.actual_related_customer.

This code is not working because I access my rows in df2 wrongly. How can I achieve the above goal? (if you have another suggestion than indices, shoot!)

sdf1 = sqlCtx.createDataFrame(
    [
        ('customer1', 'customer_code1', 'other'),
        ('customer2', 'customer_code2', 'other'),
        ('customer3', 'customer_code3', 'other'),
        ('customer4', 'customer_code4', 'other')
    ],
    ('actual_related_customer', 'customer_code', 'other')
)

sdf2 = sqlCtx.createDataFrame(
    [
        ('Peter', 'customer_code1'),
        ('Deran', 'customer_code5'),
        ('Christopher', 'customer_code3'),
        ('Nick', 'customer_code4')
    ],
    ('actual_related_customer', 'customer_code')
)

def right_customer(x,y):
    for row in sdf2.collect() :
        if x == row['customer_code'] :
            return row['actual_related_customer']
    return y

fun1 = udf(right_customer, StringType())
test = sdf1.withColumn(
    "actual_related_customer",
    fun1(sdf1.customer_code, sdf1.actual_related_customer)
)

And my desired output would look like:

desired_output = sqlCtx.createDataFrame(
    [
        ('Peter', 'customer_code1', 'other'),
        ('customer2', 'customer_code2', 'other'),
        ('Christopher', 'customer_code3', 'other'),
        ('Nick', 'customer_code4', 'other')
    ],
    ('actual_related_customer', 'customer_code', 'other')
)
pault
  • 41,343
  • 15
  • 107
  • 149
  • It's hard to tell for sure without a [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples) but you're probably approaching this problem in the wrong way. You can probably achieve the desired outcome using a `join()`. Please try to provide a small sample of your input and desired output. – pault Aug 24 '18 at 13:42
  • @pault I added sample data to create a reproducible example. I think you're saying that I could left join them by customer_code? – Charles Van Damme Aug 24 '18 at 13:57
  • I have added that as well @pault – Charles Van Damme Aug 24 '18 at 14:09
  • Charles, in order to use data from another dataframe, you need to join both dataframes on the conditions you need then make the computations – eliasah Aug 24 '18 at 14:17

1 Answers1

0

Let us do it step by step:

First rename actual_related_customer in sdf1 by actual_1 and rename actual_related_customer in sdf2 by actual_2:

sdf1=sdf1.withColumnRenamed('actual_related_customer', 'actual_1')
sdf2=sdf2.withColumnRenamed('actual_related_customer', 'actual_2')

Then join them:

sdf1= sdf1.join(sdf2, on='customer_code', how='left')
sdf1.show()

Output:

+--------------+---------+-----+-----------+
| customer_code| actual_1|other|   actual_2|
+--------------+---------+-----+-----------+
|customer_code4|customer4|other|       Nick|
|customer_code2|customer2|other|       null|
|customer_code3|customer3|other|Christopher|
|customer_code1|customer1|other|      Peter|
+--------------+---------+-----+-----------+

Now add logic to sdf1:

sdf1= sdf1.withColumn('actual_related_customer', F.when(sdf1.actual_2.isNotNull(), sdf1.actual_2).otherwise(sdf1.actual_1))

And finally show what you want:

sdf1.select('customer_code', 'other', 'actual_related_customer').show()

Output:

+--------------+-----+-----------------------+
| customer_code|other|actual_related_customer|
+--------------+-----+-----------------------+
|customer_code4|other|                   Nick|
|customer_code2|other|              customer2|
|customer_code3|other|            Christopher|
|customer_code1|other|                  Peter|
+--------------+-----+-----------------------+
pault
  • 41,343
  • 15
  • 107
  • 149
Ala Tarighati
  • 3,507
  • 5
  • 17
  • 34