5

In the following two examples, the number of tasks run and the corresponding run time imply that the sampling options have no effect, as they are similar to jobs run without any sampling options:

val df = spark.read.options("samplingRatio",0.001).json("s3a://test/*.json.bz2")

val df = spark.read.option("sampleSize",100).json("s3a://test/*.json.bz2")

I know that explicit schemas are best for performance, but in convenience cases sampling is useful.

New to Spark, am I using these options incorrectly? Attempted the same approach in PySpark, with same results:

df = spark.read.options(samplingRatio=0.1).json("s3a://test/*.json.bz2")

df = spark.read.options(samplingRatio=None).json("s3a://test/*.json.bz2")
kermatt
  • 1,585
  • 2
  • 16
  • 36
  • You will not see a change in the number of tasks spawned, rather you should see new jobs getting spawned. Can u also let me know the Spark version being used? Also, another general tip is set the log level to "debug"(`sc.setLogLevel("debug")`) and run the same, logs regarding the reason for skipping sampling(if it was) might get logged. – DaRkMaN Jun 17 '19 at 08:25
  • @DaRkMan Spark v.2.4.3. I had inspected logs at the debug level, but could not spot an indication as to why sampling appeared to not be used. I will look again and post anything that appears relevant. The curious part is that the time to "sample" is the same as the time to infer schema across all the data (with no samplingRatio option used). – kermatt Jun 17 '19 at 14:34
  • What's the order of magnitude of your timings (with schema infering sampling fraction, for instance: 0.01; 0.90 and 1.0) ? For which amount of data ? – greyside Jun 19 '19 at 09:43
  • @greyside Have tested with first three orders: 0.1, 0.01, 0.001. 12TB of JSON text, also tested with both gz and bz2 compression format. Including both the sampleRatio and sampleSize options, the time to sample never seems to change as if the options are not being specified properly or are not being used. – kermatt Jun 19 '19 at 16:03
  • ~3.5TB compressed. – kermatt Jun 19 '19 at 19:52

1 Answers1

8

TL;DR None of the you use options will have significant impact on the execution time:

You can try to sample data explicitly. In Python

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField 

def infer_json_schema(path: str, sample_size: int, **kwargs: str) -> StructType:
    spark = SparkSession.builder.getOrCreate()
    sample = spark.read.text(path).limit(sample_size).rdd.flatMap(lambda x: x)
    return spark.read.options(**kwargs).json(sample).schema

In Scala:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

def inferJsonSchema(
    path: String, sampleSize: Int, options: Map[String, String]): StructType = {
  val spark = SparkSession.builder.getOrCreate()
  val sample = spark.read.text(path).limit(sampleSize).as[String]
  spark.read.options(options).json(sample).schema
}

Please keep in mind, that to work well, sample size should at most equal to the expected size of partition. Limits in Spark escalate quickly (see for example my answer to Spark count vs take and length) and you can easily end up scanning the whole input.

user10938362
  • 3,991
  • 2
  • 12
  • 29