2

I have an application that needs to read data from MemSQL and load to DataFrames. I'm using the memsql-spark-connector to interface that communication. But, unfortunately, I'm stuck at the point I'm trying to connect to my memSQL master node. What is wrong with the way I'm connecting to memsql master node? Actually I tried to use mySQL client at my local machine to login to the memsql master node and it worked. So I guess this problem isn't related with the server side.

Here is the stack trace of my exception:

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2294)
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:2039)
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1533)
at com.memsql.spark.connector.MemSQLConnectionPool$.connect(MemSQLConnectionPool.scala:34)
at com.memsql.spark.connector.MemSQLConnectionPool$.withConnection(MemSQLConnectionPool.scala:38)
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26)
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26)
at org.apache.spark.sql.memsql.MemSQLCatalog.getDBTablePairs(MemSQLCatalog.scala:64)
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupTable(MemSQLCatalog.scala:58)
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupRelation(MemSQLCatalog.scala:24)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:48)
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:39)
at MemsqlSparkLoader.load(MemsqlSparkLoader.scala:19)
at MemsqlSparkLoaderTest$$anonfun$1.apply$mcV$sp(MemsqlSparkLoaderTest.scala:20)
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14)
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1036)
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:338)
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2232)
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2265)
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2064)
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:790)
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:395)
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:325)
at org.apache.commons.dbcp2.DriverConnectionFactory.createConnection(DriverConnectionFactory.java:39)
at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:256)
at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:2304)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2290)
... 126 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at com.mysql.jdbc.StandardSocketFactory.connect(StandardSocketFactory.java:213)
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:297)
... 142 more    

The last line of the code snippet below is where this exception is being thrown:

val masterHost:String = "XXX"
val masterPort:Int = 3307
val defaultDBName:String = "mydbtest"
val user:String = "user"
val passwd:String = "passwd"

val query:String = "select * from transactions where block_height <= 1000"

val conf:SparkConf = new SparkConf().setAppName("MemsqlSparkLoaderTest").setMaster("local")
conf.set("memsql.host", masterHost)
conf.set("memsql.port", masterPort.toString)
conf.set("memsql.defaultDatabase", defaultDBName)
conf.set("memsql.user", user)
conf.set("memsql.password", passwd)

val sc:SparkContext = new SparkContext(conf)
val msc:MemSQLContext = new MemSQLContext(sc)

val df = msc.sql(query)

The memsql.cnf config file on my master node is written as below:

[server]
basedir = .
bind_address = 0.0.0.0
core_file
durability = on
lc_messages_dir = ./share
lock_wait_timeout = 60
max_connections = 100000
plan_expiration_minutes = 180
redundancy_level = 2
skip_name_resolve = on
snapshot_trigger_size = 256m
socket = memsql.sock
ssl_cert = /var/lib/memsql/certs/server-cert.pem
ssl_key = /var/lib/memsql/certs/server-key.pem
tmpdir = .
transaction_buffer = 64m
; ------------------------------------------------------------------------
; MEMSQL OPS VARIABLES
;
; Variables below this header are controlled by MemSQL Ops.
; Please do not edit any of these values directly.
; ------------------------------------------------------------------------
master_aggregator
port = 3307
Saulo Ricci
  • 776
  • 1
  • 8
  • 27
  • Can you post MemSQL node settings (configured in memsql.cnf) – Leet-Falcon Jan 24 '16 at 18:16
  • @Leet-Falcon jus posted in the body of my question. That's the memsql.cnf file at the master node of my memsql cluster. – Saulo Ricci Jan 24 '16 at 19:06
  • Check this: http://stackoverflow.com/questions/6865538/solving-a-communications-link-failure-with-jdbc-and-mysql – Leet-Falcon Jan 25 '16 at 05:31
  • Hi Saulo - I tried to repro your issue but was unable to. Can you add some details about which machine you are connecting from and which machine you are connecting to? Like is this a local connection or remote? Also try removing the password on the account and see if that fixes it. – Carl Sverre Jan 25 '16 at 19:52

2 Answers2

2

I had the same problem too.

My MySQL client could connect to my DB server from my Spark master server, but my Spark slave server could not connect to my DB server.

I used AWS RDS service. I added authorization to the RDS security group for the ElasticMapReduce-slave and my problem was resolved.

Robert Columbia
  • 6,313
  • 15
  • 32
  • 40
llby
  • 39
  • 5
-1

It seems that your application is not able to connect to the database. You application may not have network access to your database or probably firewall is blocking certain ports that your application needs to use. Also, try to use the master ip instead of master hostname. I think that you are correct when you say that "this problem isn't related with the server side."

Please let me know if this solves the problem for you. :)

Shagun Sodhani
  • 3,535
  • 4
  • 30
  • 41
  • I'm using the IP instead of the hostname itself. But when I hit `mysql -h -P -u -p` I can successfully connect to the memsql database. Considering this scenario, do you think is still somehow a network access issue? At least the exception is network related. – Saulo Ricci Jan 24 '16 at 06:34