1

Having a following python script:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Test') \
    .config("spark.driver.extraJavaOptions", "-Xss1G") \
    .master('local[*]') \
    .getOrCreate()

dataframe = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database",
                                                                                    "finance")\
    .option("collection", "finished13").option(
            "uri", "mongodb://localhost:27017/test") \
    .option("pipeline",
            "[ { '$sort': {'prop1':-1} }, {'$limit': 700}]").load()

dataframe = dataframe.limit(2)

dataframe.show()

Spark initially starts-off by sampling over the Mongodb structure therefore the load() call takes a while. Dataframe.limit(2) call is immediate (as expected) as spark is a lazy executor. At the other hand I write the very same code in Java, as seen below:

SparkSession session = SparkSession
        .builder()
        .master("local[*]")
        .appName("Java Spark SQL basic example")
        .config("spark.driver.extraJavaOptions", "-Xss1G")
        .getOrCreate();
Dataset<Row> dataset = session.read().format("com.mongodb.spark.sql.DefaultSource").option("collection", "finished13")
        .option("uri", "mongodb://localhost:27017/test").option("pipeline",
                "[ { '$sort': {'prop1':-1} }, {'$limit': 700}]").load();


dataset = dataset.limit(2);

Object rows = dataset.collect();

In the Java version, dataset.limit(2) takes approximately 9 minutes to complete. It is sampled over the same collection. Collection consists of roughly 30k documents. Each document is of the same structure and contains around 27k properties (first level, no nesting). Average document size is 1.5MB. Any idea why does Java version takes eternity to complete?

TechCrap
  • 930
  • 3
  • 14
  • 28
  • After digging deeper, there are few findings: With respect to the following questions - https://stackoverflow.com/questions/46832394/spark-access-first-n-rows-take-vs-limit, https://stackoverflow.com/questions/35869884/more-than-one-hour-to-execute-pyspark-sql-dataframe-take4/35870245#35870245, https://issues.apache.org/jira/browse/SPARK-15689 - it looks like the pushdown is not directly provided in the Java/Scala spark-mongodb driver (reproducible also in Scala). – TechCrap Jun 22 '18 at 20:06
  • What I still don't get is why it is eagerly waiting on the command though and not only executes when the DF is collected – TechCrap Jun 22 '18 at 20:13
  • So after I decreased amount of properties to cca 3k, the transformations again started to be fast enough. Looks like dataset operations are heavily related to the amount of properties. – TechCrap Jun 22 '18 at 22:52
  • Just for the future folks if anyone else stumbles across the same issue - with mongo-java-driver-3.7.0, mongo-spark-connector_2.11-2.2.2 the driver works such that when adding few operations such as limit, select etc it iterates over all the columns. I have had 30k of them, therefore it took ages. My workaround was to create new collection through a native java driver where only aggregated documents were stored (with way fewer fields) and access this collection through spark API. – TechCrap Jul 02 '18 at 07:05

1 Answers1

0

Just for the future folks if anyone else stumbles across the same issue - with mongo-java-driver-3.7.0, mongo-spark-connector_2.11-2.2.2 the driver works such that when adding few operations such as limit, select etc it iterates over all the columns. I have had 30k of them, therefore it took ages. My workaround was to create new collection through a native java driver where only aggregated documents were stored (with way fewer fields) and access this collection through spark API.

TechCrap
  • 930
  • 3
  • 14
  • 28