0

I'm running Spark v2.2.1 via sparklyr v0.6.2 and pulling data from SQL Server via jdbc. I seem to be experiencing some network issue because many times (not every time) my executor doing a write to SQL Server fails with error:

Prelogin error: host <my server> port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:...

I am running my sparklyr session with the following configurations:

spark_conf = spark_config()
spark_conf$spark.executor.cores <- 8
spark_conf$`sparklyr.shell.driver-memory` <- "8G"
spark_conf$`sparklyr.shell.executor-memory` <- "12G"
spark_conf$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
spark_conf$spark.network.timeout <- 400

But interestingly the network timeout I've set above does not seem to apply based on the executor logs:

18/06/11 17:53:44 INFO BlockManager: Found block rdd_9_16 locally
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:3 ClientConnectionId: d3568a9f-049f-4772-83d4-ed65b907fc8b Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:d3568a9f-049f-4772-83d4-ed65b907fc8b
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:2 ClientConnectionId: ecb084e6-99a8-49d1-9215-491324e8d133 Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:ecb084e6-99a8-49d1-9215-491324e8d133
18/06/11 17:53:45 ERROR Executor: Exception in task 10.0 in stage 26.0 (TID 77)

Can someone help me understand what a prelogin error is and how to avoid this issue? Here is my write function:

function (df, tbl, db, server = NULL, user, pass, mode = "error", 
    options = list(), ...) 
{
    sparklyr::spark_write_jdbc(
  df, 
  tbl, 
  options = c(
    list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;", 
         "databaseName=", db, ";", 
         "user=", user, ";", 
         "password=", pass, ";"), 
       driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"), 
    options), 
  mode = mode, ...)
}

I've just updated my jdbc driver to version 6.0, but I don't think it made a difference. I hope i installed it correctly. I just dropped it into my Spark/jars folder and then added it into Spark/conf/spark-defaults.conf.

EDIT I am reading in 23M rows in 24 partitions into Spark. My cluster has 4 nodes with 8 cores each and 18G memory. With my current configurations I have 4 executors with 8 cores each and 12G per executor. My function to read in the data looks as such:

function (sc, tbl, db, server = NULL, user, pass, repartition = 0, options = list(), ...) 
{
    sparklyr::spark_read_jdbc(
      sc, 
      tbl, 
      options = c(
        list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;"), 
             user = user, 
             password = pass, 
             databaseName = db, 
             dbtable = tbl, 
             driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"), 
        options), 
      repartition = repartition, ...)
}

I set repartition to 24 when running. As such, I'm not seeing the connection with the post suggested.

EDIT 2
I was able to fix my issue by getting rid of repartitioning. Can anyone explain why repartitioning with sparklyr is not effective in this case?

Zafar
  • 1,897
  • 15
  • 33
  • Possible duplicate of [How to improve performance for slow Spark jobs using DataFrame and JDBC connection?](https://stackoverflow.com/questions/32188295/how-to-improve-performance-for-slow-spark-jobs-using-dataframe-and-jdbc-connecti) – Alper t. Turker Jun 11 '18 at 23:29
  • Hm, I'm not making the connection between these posts. My read is working well and I use 24 partitions, so I'm not seeing how that applies. Mind you, I used to do this fine with no read partitioning when I was connecting to a different SQL Server DB. – Zafar Jun 11 '18 at 23:57
  • 1
    I'm not closing this question yet. The spark_read_jdbc function doesn't work the way you think it does. Repartitioning happens after the data is pulled , which is the source of the problem. You can check that in the [source code](https://github.com/rstudio/sparklyr/blob/623852f0683a0a82bc46acca1e7c2438ce5ee7a6/R/data_interface.R#L609) of the function. – eliasah Jun 12 '18 at 09:14
  • Which makes this question a duplicate. But if you wish to have a better understanding about how do you actually fix this, I advise you to read my answer [here](https://stackoverflow.com/questions/45027091/spark-2-1-hangs-while-reading-a-huge-datasets/45028675#45028675) – eliasah Jun 12 '18 at 09:14
  • @eliasah if you answer the question with advice on why repartitioning caused timeout issues I will mark it as best answer to close this question. – Zafar Jun 12 '18 at 17:15

1 Answers1

4

As explained in the other question, as well as some other posts (Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?, Converting mysql table to spark dataset is very slow compared to same from csv file, Partitioning in spark while reading from RDBMS via JDBC, spark reading data from mysql in parallel) and off-site resources (Parallelizing Reads), by default Spark JDBC source reads all data sequentially into a single node.

There are two ways of parallelizing reads:

  • Range splitting based on a numerical column with lowerBound, upperBound, partitionColumn and numPartitions options required, where partitionColumn is a stable numeric column (pseudocolumns might not be a good choice)

    spark_read_jdbc(
      ...,
      options = list(
        ...
        lowerBound = "0",                 # Adjust to fit your data 
        upperBound = "5000",              # Adjust to fit your data 
        numPartitions = "42",             # Adjust to fit your data and resources
        partitionColumn = "some_numeric_column"
      )
     )
    
  • predicates list - not supported in sparklyr at the moment.

Repartitioning (sparklyr::sdf_repartition doesn't resolve the problem because it happens after data has been loaded. Since shuffle (required for repartition) belongs to the most expensive operations in Spark it can easily crash the node.

As a result using:

  • repartition parameter of spark_read_jdbc:

  • sdf_repartition

is just a cargo cult practice, and most of the time does more harm than good. If data is small enough to be piped through a single node, then increasing number of partitions will usually decreases performance. Otherwise it will just crash.

That being said - if data is already processed by a single node it raises a question, if it makes sense to use Apache Spark at all. The answer will depend on the rest of your pipeline, but considering only component in question, it likely be negative.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115