2

I had the following code:

     import org.jooq._ 
     import org.jooq.impl._ 
     import org.jooq.impl.DSL._ 
     import java.sql.DriverManager
     import org.apache.log4j.receivers.db.dialect.SQLDialect

     val session = SparkSession.builder().getOrCreate()
     var df1 = session.emptyDataFrame
     var df2 = session.emptyDataFrame

     val userName = "user"
     val password = "pass"

     val c = DriverManager.getConnection("jdbc:mysql://blah_blah.com", userName, password)

     df1 = sql(s"select * from $db1_name.$tb1_name")
     df2 = c.prepareStatement(s"select * from $db2_name.$tb2_name")

Then I got the following error:

    found : org.jooq.SQL
    required: org.apache.spark.sql.DataFrame
    (which expands to) 
    org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
    df1 = sql(s"select * from $db1_name.$tb1_name")
             ^

    found : java.sql.PreparedStatement
    required: org.apache.spark.sql.DataFrame
    (which expands to) 
    org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
    df2 = c.prepareStatement(s"select * from $db2_name.$tb2_name")

Then per the comments suggestions I changed my code to:

I have the following Scala code:

    val userName = "user"
    val password = "pass"

    val session = SparkSession.builder().getOrCreate()
    var df1 = session.emptyDataFrame
    var df2 = session.emptyDataFrame

    ....
    ....
    df1 = sql(s"select * from $db1_name.$tb1_name")
    df2 = session.read.format("jdbc").
    option("url", "jdbc:mysql://blah_blah.com").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", userName).
    option("password", password).
    option("dbtable",s"select * from $db2_name.$tb2_name").load()

I am getting errors as following:

    The last packet sent successfully to the server was 0 milliseconds 
    ago. The driver has not received any packets from the server.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989)
    at com.mysql.jdbc.MysqlIO.readPacket(MysqlIO.java:632)
    at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1016)
    at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2194)
    at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2225)
    at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2024)
    at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:779)
    at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:389)
    at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    ... 78 elided
    Caused by: java.io.EOFException: Can not read response from server. 
    Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
    at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3011)
    at com.mysql.jdbc.MysqlIO.readPacket(MysqlIO.java:567)
    ... 100 more

Any solution or suggestion on these two errors?

I have tried postgresql and h2 driver as well => org.postgresql.Driver

But I get similar errors (not exact maybe)

Alan
  • 417
  • 1
  • 7
  • 22
  • 2
    [Maybe helpful](https://stackoverflow.com/questions/38376307/create-spark-dataframe-from-sql-query)? That solution uses spark directly to query the database rather than pulling data from jdbc and then trying to fit it into a DataFrame. – crenshaw-dev Sep 23 '19 at 20:41
  • I get this error: The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. – Alan Sep 24 '19 at 03:17
  • spark.sql where spark means that the spark Session. – Lamanus Sep 24 '19 at 11:54

2 Answers2

1

Your issue is that the scala compilere have already initialized the var ds1 and ds2 as empty dataframe. you have to try to read directly from spark:

spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()

for other info you can check directly on the apache spark page

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

gccodec
  • 343
  • 1
  • 8
  • I need to have them saved as dataframe. I tried your solution the output of the queries won't be dataframe. – Alan Sep 23 '19 at 22:49
  • By definition of Apache Spark library the code that I wrote returns a sql.Dataframe. So you are sure that write in the right way? post your code! – gccodec Sep 23 '19 at 22:59
  • I can assign to an empty dataframe as I was going to. But the connection is refused and I am not sure why: Caused by: java.net.ConnectException: Connection refused (Connection refused). So I do: var df2 = SparkSession.builder().getOrCreate().emptyDataFrame and then df2 = session.read.format("jdbc").option("url", "jdbc:mysql://blah_blah.com").option("driver", "com.mysql.jdbc.Driver").option("useUnicode", "true").option("continueBatchOnError","true").option("useSSL", "false").option("user", userName).option("password", password).option("dbtable",s"select * from $db2_name.$tb2_name").load() – Alan Sep 23 '19 at 23:50
  • @Alan can you post all the stack trace for connection refuse and update the code on you post? Are you sure that you still use the right username and password? – gccodec Sep 24 '19 at 09:47
  • Yes i use correct username and password. I even ssh to the server from GCP using the credentials. I updated the question along with the errors that I am seeing. – Alan Sep 24 '19 at 20:15
  • @Alan are you sure that you still use the right driver for your db instance? – gccodec Sep 25 '19 at 08:23
  • Yes I am sure, I tried different drivers. mysql, org.postgresql.Driver, and h2 – Alan Sep 25 '19 at 18:59
  • @Alan can you try to remove the df1 from your code? and re-run it? – gccodec Sep 26 '19 at 08:26
1

You can simply get a DataFrame by reading as below. Set you connection details:

val jdbcHostname = "some.host.name"
val jdbcDatabase = "some_db"
val driver = "com.mysql.cj.jdbc.Driver" // update driver as needed, In your case it will be `org.postgresql.Driver`
// url to DB
val jdbcUrl = s"jdbc:mysql://$jdbcHostname:3306/$jdbcDatabase"
val username = "someUser"
val password = "somePass"

// create a properties map for your DB connection
val connectionProperties = new Properties()

connectionProperties.put("user", s"${username}")
connectionProperties.put("password", s"${password}")
connectionProperties.setProperty("Driver", driver)

and then read from JDBC as:

// use above created url and connection properties to fetch data
val tableName = "some-table"
val mytable = spark.read.jdbc(jdbcUrl, tableName, connectionProperties)

Spark automatically reads the schema from the database table and maps its types back to Spark SQL types.

You can use the above mytable dataframe to run your queries or save data.

Say you want to select the columns like and save then

// your select query
val selectedDF = mytable.select("c1", "c2")
// now you can save above dataframe
apnith
  • 315
  • 3
  • 15
  • Thanks for your detailed answer. I get this error though: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver – Alan Sep 24 '19 at 18:45
  • @Alan you can download the MySQL connector and add to the classpath. You can even add this dependency to your project - mysql mysql-connector-java 8.0.11 – apnith Sep 25 '19 at 04:14
  • @Alan I just noticed you need to use `org.postgresql.Driver` as your driver – apnith Sep 25 '19 at 05:38
  • Thanks for your comment. Actually, as I mentioned in my main post that I've tried org.postgresql.Driver as well. – Alan Sep 25 '19 at 06:16
  • @Alan you are facing connection issues to server. Can you add `?useSSL=false&autoReconnect=true` to your JDBC URL. If SSL false is not working, you can remove just that and try – apnith Sep 26 '19 at 10:07
  • I tried reconnect (true) as well, the same error: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up. – Alan Sep 28 '19 at 07:43