I have an application that needs to read data from MemSQL and load to DataFrames. I'm using the memsql-spark-connector to interface that communication. But, unfortunately, I'm stuck at the point I'm trying to connect to my memSQL master node. What is wrong with the way I'm connecting to memsql master node? Actually I tried to use mySQL client at my local machine to login to the memsql master node and it worked. So I guess this problem isn't related with the server side.
Here is the stack trace of my exception:
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2294)
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:2039)
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1533)
at com.memsql.spark.connector.MemSQLConnectionPool$.connect(MemSQLConnectionPool.scala:34)
at com.memsql.spark.connector.MemSQLConnectionPool$.withConnection(MemSQLConnectionPool.scala:38)
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26)
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26)
at org.apache.spark.sql.memsql.MemSQLCatalog.getDBTablePairs(MemSQLCatalog.scala:64)
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupTable(MemSQLCatalog.scala:58)
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupRelation(MemSQLCatalog.scala:24)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:48)
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:39)
at MemsqlSparkLoader.load(MemsqlSparkLoader.scala:19)
at MemsqlSparkLoaderTest$$anonfun$1.apply$mcV$sp(MemsqlSparkLoaderTest.scala:20)
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14)
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1036)
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:338)
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2232)
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2265)
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2064)
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:790)
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:395)
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:325)
at org.apache.commons.dbcp2.DriverConnectionFactory.createConnection(DriverConnectionFactory.java:39)
at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:256)
at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:2304)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2290)
... 126 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at com.mysql.jdbc.StandardSocketFactory.connect(StandardSocketFactory.java:213)
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:297)
... 142 more
The last line of the code snippet below is where this exception is being thrown:
val masterHost:String = "XXX"
val masterPort:Int = 3307
val defaultDBName:String = "mydbtest"
val user:String = "user"
val passwd:String = "passwd"
val query:String = "select * from transactions where block_height <= 1000"
val conf:SparkConf = new SparkConf().setAppName("MemsqlSparkLoaderTest").setMaster("local")
conf.set("memsql.host", masterHost)
conf.set("memsql.port", masterPort.toString)
conf.set("memsql.defaultDatabase", defaultDBName)
conf.set("memsql.user", user)
conf.set("memsql.password", passwd)
val sc:SparkContext = new SparkContext(conf)
val msc:MemSQLContext = new MemSQLContext(sc)
val df = msc.sql(query)
The memsql.cnf
config file on my master node is written as below:
[server]
basedir = .
bind_address = 0.0.0.0
core_file
durability = on
lc_messages_dir = ./share
lock_wait_timeout = 60
max_connections = 100000
plan_expiration_minutes = 180
redundancy_level = 2
skip_name_resolve = on
snapshot_trigger_size = 256m
socket = memsql.sock
ssl_cert = /var/lib/memsql/certs/server-cert.pem
ssl_key = /var/lib/memsql/certs/server-key.pem
tmpdir = .
transaction_buffer = 64m
; ------------------------------------------------------------------------
; MEMSQL OPS VARIABLES
;
; Variables below this header are controlled by MemSQL Ops.
; Please do not edit any of these values directly.
; ------------------------------------------------------------------------
master_aggregator
port = 3307