1

I am trying to query a database directly:

file_df.createOrReplaceTempView("file_contents")

QUERY = "SELECT * FROM TABLE1 INNER JOIN file_contents on TABLE1.ID = file_contents.ID"

df = sqlContext.read.format("jdbc").options(
    url=URL,
    driver=DRIVER,
    query=QUERY,
    user=USER,
    password=PASSWORD
).load()

TABLE1 is in the Oracle Database.

However, this code results in the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o343.load. : java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist

How can I fix this? That is I want to not load the large database table and instead query it directly and load only the contents that result from the inner join with the TempView file_contents.

kpatel23
  • 31
  • 5

1 Answers1

0

You cannot do it without taking to same platform

Option 1 - Preferred

spark = SparkSession.builder.getOrCreate()
jdbcUrl = "jdbc:oracle:thin:@{0}:{1}/{2}".format("asdas", "1521", "asdasd")
connectionProperties = {
  "user" : "asdasd",
  "password" : "asdasda",
  "driver" : "oracle.jdbc.driver.OracleDriver",
  "fetchsize" : "100000"
}
pushdown_query = "(SELECT * FROM TABLE1 ) aliasname"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

file_df.createOrReplaceTempView("file_contents")
df.createOrReplaceTempView("table1")
spark.sql("SELECT * FROM TABLE1 INNER JOIN file_contents on TABLE1.ID = file_contents.ID")

Option 2

You have to have temp table in the oracle end to load it say table is filecontent so this and then try extract the required.

file_df.write.format('jdbc').options(url=tgt_url,driver=tgtdriver, dbtable=filecontent,user=tgt_username,password=tgt_password).mode("overwrite).option("truncate","true").save()

Option3 - If the if file content is something collected as list and passed in clause

file_df_id= file_df.select("ID").rdd.flatMap(lambda x: x).collect()
query_param = ",".join(file_df_id)
query = f'select * from table1 where TABLE1.ID in ({query_param})  query_temp'
print(query)
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
Rafa
  • 487
  • 7
  • 22
  • Can you explain what is fetchsize? is it only getting 100000 records at a time. If so what if we have 1M records then do we have to call that code 10 times in for loop? isn't that same thing as loading it entirely which is what we are trying to avoid? – kpatel23 Jun 11 '21 at 14:56
  • by default the fetch size is 1000 I believe. if you want faster extraction u need to use lowerbound,upperbound and numpartition along with fetchsize. spark takes care of all things needed all u need is just configure ur query wit hattributes no need for for loop or anything https://stackoverflow.com/questions/41085238/what-is-the-meaning-of-partitioncolumn-lowerbound-upperbound-numpartitions-pa – Rafa Jun 11 '21 at 17:06
  • The method above will still get the full table though (100000 per call). We are trying to avoid that. We want some way we can reference the PySpark table (from createTempView) and Oracle Table and do a query call so the result of the output of oracle table is less then loading entire table (which would save cost) – kpatel23 Jul 20 '21 at 18:09
  • if the content is something you can pass in - in clause then there is another options . Update the answer accordginly – Rafa Jul 20 '21 at 20:58