10

I want to use Spark to process some data from a JDBC source. But to begin with, instead of reading original tables from JDBC, I want to run some queries on the JDBC side to filter columns and join tables, and load the query result as a table in Spark SQL.

The following syntax to load raw JDBC table works for me:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306",
    dbtable="mydb.table1",
    user="me",
    password="******",
    driver="com.mysql.jdbc.Driver" # mysql JDBC driver 5.1.41
).load() 
df_table1.show() # succeeded

According to Spark documentation (I'm using PySpark 1.6.3):

dbtable: The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.

So just for experiment, I tried something simple like this:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306",
    dbtable="(SELECT * FROM mydb.table1) AS table1",
    user="me",
    password="******",
    driver="com.mysql.jdbc.Driver"
).load() # failed

It threw the following exception:

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'table1 WHERE 1=0' at line 1

I also tried a few other variations of the syntax (add / remove parentheses, remove 'as' clause, switch case, etc) without any luck. So what would be the correct syntax? Where can I find more detailed documentation for the syntax? Besides, where does this weird "WHERE 1=0" in error message come from? Thanks!

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Dichen
  • 358
  • 1
  • 2
  • 12
  • From my point of view, you just have to specifiy the table you are trying to pull into spark so leave out the select statment. The 0=1 is derived from the parameter you didn't specify. Take a look at the source code of the [Dataframe Reader](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L207-L216) – jojo_Berlin Apr 03 '17 at 05:07
  • This http://stackoverflow.com/q/32628717/1305344 looks similar but it's with PostgreSQL (not MySQL). – Jacek Laskowski Apr 20 '17 at 20:35

4 Answers4

4

For reading data from JDBC source using sql query in Spark SQL, you can try something like this:

val df_table1 = sqlContext.read.format("jdbc").options(Map(
    ("url" -> "jdbc:postgresql://localhost:5432/mydb"),
    ("dbtable" -> "(select * from table1) as table1"),
    ("user" -> "me"),
    ("password" -> "******"),
    ("driver" -> "org.postgresql.Driver"))
).load()

I tried it using PostgreSQL. You can modify it according to MySQL.

himanshuIIITian
  • 5,985
  • 6
  • 50
  • 70
  • It seems I was doing the same thing as your answer, except I'm using python. Perhaps there is some syntax error on my code specific to PySpark? – Dichen Apr 04 '17 at 06:26
  • You need to add the starting "(" and ending ")" around your query in Python. Also, you may have to end it with 't'. So something like this dbtable = "() t" – Nikhil Gupta Mar 30 '21 at 15:36
4
table = "(SELECT id, person, manager, CAST(tdate AS CHAR) AS tdate, CAST(start AS   CHAR) AS start, CAST(end AS CHAR) as end, CAST(duration AS CHAR) AS duration FROM EmployeeTimes) AS EmployeeTimes",

spark = get_spark_session()
df = spark.read.format("jdbc"). \
    options(url=ip,
            driver='com.mysql.jdbc.Driver',
            dbtable=table,
            user=username,
            password=password).load()
return df

I had heaps of trouble with Spark JDBC incompatability with MYSQL timestamps. The trick is to convert all your timestamp or duration values to a string prior to having the JDBC touch them. Simply cast your values as strings and it will work.

Note: You will also have to use AS to give the query an alias for it to work.

Zack
  • 101
  • 1
  • 5
3

With Spark 2.2 on Python connecting to a MySQL (5.7.19) I'm able to run the following when I use table="(SELECT * FROM a_table) AS my_table".

from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder.appName("myApp") \
    .config("jars", "/usr/local/spark-2.2.2-bin-hadoop2.7/jars/mysql-connector-java-5.1.45-bin.jar") \
    .getOrCreate()

my_df = my_spark.read.jdbc(
    url="jdbc:mysql://my_host:3306/my_db",
    table="(SELECT * FROM a_table) AS my_table",
    properties={'user': 'my_username', 'password': 'my_password'}
)

my_df.head(20)
Suzana
  • 4,251
  • 2
  • 28
  • 52
Will
  • 11,276
  • 9
  • 68
  • 76
2

I think it may be a bug in Spark SQL.

It seems that either this or this line gives you the error. Both use a Scala string interpolation to replace table with dbtable.

s"SELECT * FROM $table WHERE 1=0"

That's where you can find table1 WHERE 1=0 from the error you've faced since the above pattern would become:

SELECT * FROM (select * from table1) as table1 WHERE 1=0

which looks incorrect.

There is indeed a MySQL-specific dialect - MySQLDialect - that overrides getTableExistsQuery with its own:

override def getTableExistsQuery(table: String): String = {
  s"SELECT 1 FROM $table LIMIT 1"
}

so my bet is that the other method getSchemaQuery is the source of the error. That's highly unlikely tough given you use Spark 1.6.3 while the method has @Since("2.1.0") marker.

I'd highly recommend checking out the logs of MySQL database and see what query is executed that leads to the error message.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420