6

I have csv file in Amazon s3 with is 62mb in size (114 000 rows). I am converting it into spark dataset, and taking first 500 rows from it. Code is as follow;

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+"");

 set.take(500)

The whole operation takes 20 to 30 sec.

Now I am trying the same but rather using csv I am using mySQL table with 119 000 rows. MySQL server is in amazon ec2. Code is as follow;

String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;

SparkSession spark=StartSpark.getSparkSession();

SQLContext sc = spark.sqlContext();

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set = sc
            .read()
            .option("url", url)
            .option("dbtable", this.tableName)
            .option("driver","com.mysql.jdbc.Driver")
            .format("jdbc")
            .load();
set.take(500);

This is taking 5 to 10 minutes. I am running spark inside jvm. Using same configuration in both cases.

I can use partitionColumn,numParttition etc but I don't have any numeric column and one more issue is the schema of the table is unknown to me.

My issue is not how to decrease the required time as I know in ideal case spark will run in cluster but what I can not understand is why this big time difference in the above two case?

KOUSIK MANDAL
  • 2,002
  • 1
  • 21
  • 46

2 Answers2

10

This problem has been covered multiple times on StackOverflow:

and in external sources:

so just to reiterate - by default DataFrameReader.jdbc doesn't distribute data or reads. It uses single thread, single exectuor.

To distribute reads:

  • use ranges with lowerBound / upperBound:

    Properties properties;
    Lower
    
    Dataset<Row> set = sc
        .read()
        .option("partitionColumn", "foo")
        .option("numPartitions", "3")
        .option("lowerBound", 0)
        .option("upperBound", 30)
        .option("url", url)
        .option("dbtable", this.tableName)
        .option("driver","com.mysql.jdbc.Driver")
        .format("jdbc")
        .load();
    
  • predicates

    Properties properties;
    Dataset<Row> set = sc
        .read()
        .jdbc(
            url, this.tableName,
            {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"},
            properties
        )
    
Community
  • 1
  • 1
user7698675
  • 126
  • 2
  • I'm trying to read `MySQL` table having 500M rows with `numPartitions=32`. Still the reading using `Spark` is much slower than [`sqoop`](http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_controlling_parallelism) (also with 32 tasks). I even tried setting `fetchsize` to higher value (1k or 10k) but no gain. I'm using standard `Connector/J v5.1.41` and I'm on `MySQL v5.6`. – y2k-shubham Apr 11 '18 at 13:30
  • is there a way to improve parallelism in spark 2.4 for jdbc reader? – Sree51 Sep 15 '22 at 12:11
-2

Please follow the steps below

1.download a copy of the JDBC connector for mysql. I believe you already have one.

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar

2.create a db-properties.flat file in the below format

jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>

3.create a empty table first where you want to load the data.

invoke spark shell with driver class

spark-shell --driver-class-path  <your path to mysql jar>

then import all the required package

import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

initiate a hive context or a sql context

val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql

set some of the properties

sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

Load mysql db properties from file

val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db-        properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")

create a query to read the data from your table and pass it to read method of #sqlcontext. this is where you can manage your where clause

val df1 = "(SELECT  * FROM your_table_name) as s1" 

pass the jdbcurl, select query and db properties to read method

val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)

write it to your table

df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")
Gangadhar Kadam
  • 536
  • 1
  • 4
  • 15
  • What have you done differently that you think would solve the asked question, i.e. **slow reading of `MySQL` *table* into `DataFrame`**? Can you point out a mistake / inefficiency in the *code snippets* provided in the question that is improved in some way by the *snippets* provided by you? – y2k-shubham Mar 06 '18 at 12:07