0

enter image description here

I have a table which has data as shown in the diagram . I want to create store results in dynamically generated data frame names.

For eg here in the below example I want to create two different data frame name dnb_df and es_df and store the read result in these two frames and print structure of each data frame

When I am running the below code getting the error

SyntaxError: can't assign to operator (TestGlue2.py, line 66)


import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import regexp_replace, col


args = getResolvedOptions(sys.argv, ['JOB_NAME'])





sc = SparkContext()
#sc.setLogLevel('DEBUG')
glueContext = GlueContext(sc)
spark = glueContext.spark_session

#logger = glueContext.get_logger()
#logger.DEBUG('Hello Glue')
job = Job(glueContext)
job.init(args["JOB_NAME"], args)



client = boto3.client('glue', region_name='XXXXXX')
response = client.get_connection(Name='XXXXXX')
connection_properties = response['Connection']['ConnectionProperties']
URL = connection_properties['JDBC_CONNECTION_URL']
url_list = URL.split("/")
host = "{}".format(url_list[-2][:-5])
new_host=host.split('@',1)[1]
port = url_list[-2][-4:]
database = "{}".format(url_list[-1])
Oracle_Username = "{}".format(connection_properties['USERNAME'])
Oracle_Password = "{}".format(connection_properties['PASSWORD'])

#print("Oracle_Username:",Oracle_Username)
#print("Oracle_Password:",Oracle_Password)
print("Host:",host)
print("New Host:",new_host)
print("Port:",port)
print("Database:",database)
Oracle_jdbc_url="jdbc:oracle:thin:@//"+new_host+":"+port+"/"+database
print("Oracle_jdbc_url:",Oracle_jdbc_url)
source_df = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", "(select * from schema.table order by VENDOR_EXECUTION_ORDER) ").option("user", Oracle_Username).option("password", Oracle_Password).load()
vendor_data=source_df.collect()
for row  in vendor_data :
    vendor_query=row.SRC_QUERY
   row.VENDOR_NAME+'_df'= spark.read.format("jdbc").option("url", 
               Oracle_jdbc_url).option("dbtable", vendor_query).option("user", 
            Oracle_Username).option("password", Oracle_Password).load()
    print(row.VENDOR_NAME+'_df')


Added use case in picture enter image description here

pbh
  • 186
  • 1
  • 9
  • What is line 66? – UpmostScarab Feb 19 '23 at 01:47
  • Does this answer your question? [SyntaxError: cannot assign to operator](https://stackoverflow.com/questions/8956825/syntaxerror-cannot-assign-to-operator) – UpmostScarab Feb 19 '23 at 01:49
  • It is giving me error in last line where I am trying to print dataframe result – pbh Feb 19 '23 at 02:29
  • @UpmostScarab I am looking for a way to generate dataframe names dynamically and then use those names to print the result stored in dataframes – pbh Feb 19 '23 at 02:32
  • You want to assign the dataframe to different dynamically generated names if I got it correct? In that case I don't think this relates to spark but Python in general – Ronak Jain Feb 19 '23 at 04:32
  • Also, I don't think you should assign Dataframe to Row, instead you can convert the row as dict – Ronak Jain Feb 19 '23 at 04:33
  • Hi @ronak I am getting error on join, in actual scenario I have another column org_code on the basis of which I am joining `AnalysisException: USING column ORG_CODE cannot be resolved on the left side of the join. The left-side columns: [VENDOR_NAME, SRC_QUERY, VENDOR_EXECUTION_ORDER]` – pbh Feb 20 '23 at 18:09
  • It seems the column is not present in left dataframe, or maybe you're using wrong dataframe, can you share the code? – Ronak Jain Feb 20 '23 at 18:15
  • Yes Ronak I have identified the issue. It was silly mistake. Sorry about that :) – pbh Feb 20 '23 at 18:20

2 Answers2

1

Update: As discussed in the comments, your requirement is to further join all with another dataframe

for row in vendor_data:
  rowAsDict=row.asDict()
  # Here you can use any variable as rowAsDict is not going to be used anywhere else anyway 
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
  main_dataframe=main_dataframe.join(rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"], "acc_id")

Input main_dataframe:

Main DF

source_df :

Source_df

View1 and View2:

Views

Output main_dataframe

Output

If I understood correctly, you need to generate the VENDOR_NAME_DF dynamically.

You won't be able to assign to the Row Object, neither it'll be useful to assign dataframe to a Row as you can't create a Dataframe with a column of type Dataframe.

Though, you can convert a row to a dict using asDict and use that instead.

This would work:

vendor_data=source_df.collect()

for row in vendor_data:
  rowAsDict=row.asDict()
  # Replace this with spark.read() or any way to create a Dataframe
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"]) 
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show() 

Input Source_DF:

Input

Result of SOURCE_QUERY:

Input

Output (of rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()):

Output

Final rowAsDict:

{'VENDOR_NAME': 'Name1', 'SOURCE_QUERY': 'select * from view1', 'Name1_df': DataFrame[id: string, date: string, Code: string]}
Ronak Jain
  • 3,073
  • 1
  • 11
  • 17
  • Actually after getting data in dataframes I wanted to join those together.would that be possible? – pbh Feb 19 '23 at 12:03
  • Do you want to join all the collected dataframes together or join them with the source_df dataframe? – Ronak Jain Feb 19 '23 at 12:38
  • I want to join join then with source_df dataframe. Also I would like to mention currently I am running and populating all vendor dataframes in loop .soon I will change it to process where all vendor dataframes will be populated in parallel and then join them all to source dataframe. hope this helps .let me know if there is still any confusion I will update my question in more elaborate manner – pbh Feb 19 '23 at 12:48
  • Is there a common id with which we can join those dataframes? – Ronak Jain Feb 19 '23 at 13:03
  • Hi Ronak I have edited my question and added use case in excel format – pbh Feb 19 '23 at 13:47
  • let me try that and will let you know ..thanks – pbh Feb 19 '23 at 13:55
  • would this solution work when I will try to create each vendors dataframe in parallel and then perform join – pbh Feb 19 '23 at 17:20
  • @purnimaBhatia Probably Yes, as the order of joining won't matter. But I am not much aware about Python, so can't really comment. – Ronak Jain Feb 19 '23 at 17:37
1

Add the last two lines in your for loop, you should be able to get the results. First one is creating a temp table using the dynamic df name Second is to show the data in that temp table.

for row  in vendor_data :
    vendor_query=row.SRC_QUERY
    spark.read.format("jdbc").option("url", 
               Oracle_jdbc_url).option("dbtable", vendor_query).option("user", 
            Oracle_Username).option("password", Oracle_Password).load().createOrReplaceTempView(row.VENDOR_NAME+'_df')   
    spark.sql("select * from "+row.VENDOR_NAME+"_df").show()
    
  • can these temp view be joined together later on ? – pbh Feb 20 '23 at 12:22
  • Yes they can be, they are the same as dataframes, the only difference is we use spark api to join and do other transformations when its a dataframe. When its a temp table u can do the same transformations using normal sql, using spark.sql – Meena Arumugam Feb 20 '23 at 14:56