1

I am doing one transformation in pyspark code and creating new columns. I have observed source_df is getting replaced with new columns. Is it possible to merge new columns with existing dataframe columns?

source_df=source_df.rdd.map(lambda x:winner_calc(x["org_attributes_dict"])).toDF()

Output 
+---------+----------+------------+
|winner_bn|winner_hj|winner_value|
+---------+----------+------------+
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
+---------+----------+------------+

Sharing Sample code as cannot share actual code. If you see in final result actual data frame is getting override with New value 'H' for all rows. I want to add that as new column in existing dataframe instead of overwriting it.

import sys,os
import concurrent.futures
from concurrent.futures import *
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.context import SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime
from pyspark.sql.functions import array
from pyspark.sql.functions import sha2, concat_ws
from pyspark.sql.functions import  udf
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
#from pyspark.sql.functions import StringType
from pyspark.sql.functions import row_number,lit,col,expr
from pyspark.sql.window import Window
import requests
import json
import traceback
import base64
import pandas as pd 
import pyspark.sql.types as T
from pyspark.sql import functions as F


def val():
    return (tuple('H'))

###############################

class JobBase(object):
    spark=None
    def __start_spark_glue_context(self):
        conf = SparkConf().setAppName("python_thread")
        self.sc = SparkContext(conf=conf)
        self.glueContext = GlueContext(self.sc)
        self.spark = self.glueContext.spark_session
            
    def execute(self):
        self.__start_spark_glue_context()
        d =[{"account_number": 1, "v1": 100830, "v2": 1000},
                {"account_number": 2, "v1": 2000, "v2": 2},
                {"account_number": 3, "v1": 555, "v2": 55}]

        df = self.spark.createDataFrame(d)
        df.show()
        try:
            df=df.rdd.map(lambda x :val()).toDF()
        except Exception as exp:
            exception_type, exception_value, exception_traceback = sys.exc_info()
            traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
            err_msg = json.dumps({
                "errorType": exception_type.__name__,
                "errorMessage": str(exception_value),
                "stackTrace": traceback_string})
            print(err_msg)
        
        df.show()

        
def main():
    job = JobBase()
    job.execute() 
    

if __name__ == '__main__':
    main()

Output of main()

+--------------+------+----+
|account_number|    v1|  v2|
+--------------+------+----+
|             1|100830|1000|
|             2|  2000|   2|
|             3|   555|  55|
+--------------+------+----+

+---+
| _1|
+---+
|  H|
|  H|
|  H|
+---+
double-beep
  • 5,031
  • 17
  • 33
  • 41
pbh
  • 186
  • 1
  • 9

1 Answers1

1

Replace the line

df=df.rdd.map(lambda x :val()).toDF()

with

df = df.rdd.map(lambda row: row+val()).toDF(df.columns + ["v3"])

Output:

+--------------+------+----+---+
|account_number|    v1|  v2| v3|
+--------------+------+----+---+
|             1|100830|1000|  H|
|             2|  2000|   2|  H|
|             3|   555|  55|  H|
+--------------+------+----+---+
arudsekaberne
  • 830
  • 4
  • 11
  • @arudekaberne It was just an example. My actual logic in function is quite complicated and my motive was to understand how can i merge columns returned by RDD to my existing dataframe – pbh Mar 21 '23 at 17:42
  • Convert your RDD into DataFrame and try joining your existing DataFrame using an identifier column – arudsekaberne Mar 21 '23 at 17:59
  • @aruddekaberne . Ok so there is no direct way to merge except for joining . Thanks for information – pbh Mar 21 '23 at 18:09
  • I have updated my solution, try this and let me. – arudsekaberne Mar 21 '23 at 18:35