0

I have a use case which I am trying to implement using spark solution in AWS glue. I have one table which has query stored as column value which I need to run from script .

For exmaple > Select src_query from table;

This give me another query mentioned below :

select tabl2.col1,tabl3.col2 from table2 join table 3 ;

Now I want to collect information of this second query in dataframe and proceed further.

source_df = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", "table1").option("user", Oracle_Username).option("password", Oracle_Password).load()

Now when we run this data from table1 gets stored in source_df . One of column of table1 is storing some sql query . select col1,col2 from tabl2;

Now I want to run query mentioned above and store its result in dataframe .Something like

 final_df2  = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("query", "select col1,col2 from tabl2").option("user", Oracle_Username).option("password", Oracle_Password).load()

How can I get query from data frame and run it as query to fetch another result in another dataframe .

pbh
  • 186
  • 1
  • 9
  • Hi Purnima, can you give some sample data and give some more explanation. – Meena Arumugam Feb 17 '23 at 00:24
  • @Meena I have more details , please let me know if that helps . – pbh Feb 17 '23 at 00:37
  • Ya, getting an idea as to what you need, but how many rows will the first query return, for example if it returns 50 rows, should we have 50 new data frames created based on the queries in the column in each row? – Meena Arumugam Feb 17 '23 at 03:40
  • First query is picking data from static table and we will all that Information in one dataframe. then we will loop through that dataframe to pick 2nd query from each row (stored as column ) and run it and store it's result in new dataframe.Also I would like to mention results of 2nd query will be stored in separate data frames.let me know if this helps – pbh Feb 17 '23 at 09:40

1 Answers1

2

You can use the below code when the source table has less number of rows as we will be using collect to get all the queries from the source table.


import org.apache.spark.sql.functions._
// created sample secondary tables found in the query
Seq(("A","01/01/2022",1), ("AXYZ","02/01/2022",1), ("AZYX","03/01/2022",1),("AXYZ","04/01/2022",0), ("AZYX","05/01/2022",0),("AB","06/01/2022",1), ("A","07/01/2022",0) )
.toDF("Category", "date", "Indictor")
.write.mode("overwrite").saveAsTable("table1")
Seq(("A","01/01/2022",1), ("b","02/01/2022",0), ("c","03/01/2022",1) )
.toDF("Category", "date", "Indictor")
.write.mode("overwrite").saveAsTable("table2")

//create the source dataframe
val df=Seq( (1,"select Category from table1"), (2,"select date from table2") )
.toDF("Sno", "Query")

//extract the query from the source table.
val qrys = df.select("Query").collect()
qrys.foreach(println)

//execute the queries in column and save it as tables
qrys.map(elm=>spark.sql(elm.mkString).write.mode("overwrite").saveAsTable("newtbl"+qrys.indexOf(elm)))

//select from the new tables.
spark.sql("select * from newtbl0").show
spark.sql("select * from newtbl1").show

Output:

[select Category from table1]
[select date from table2]
+--------+
|Category|
+--------+
|       A|
|    AXYZ|
|    AZYX|
|    AXYZ|
|    AZYX|
|      AB|
|       A|
+--------+

+----------+
|      date|
+----------+
|01/01/2022|
|02/01/2022|
|03/01/2022|
+----------+
  • Hi Meena Thanks for response . As I am completely new this. Just wanted to confirm is this Scala version of code ? – pbh Feb 17 '23 at 19:36
  • Yes, its Scala. – Meena Arumugam Feb 18 '23 at 13:07
  • Hi Meena I have raised another question which is in continuation of this . It Would be great if you could help me with that https://stackoverflow.com/questions/75497503/how-to-generate-pyspark-dynamic-frame-name-dynamically – pbh Feb 19 '23 at 01:45