2

For testing purposes, I have configured a 4-node cluster, each of them has a Spark Worker and a MongoDB Shard. These are the details:

  • Four Debian 9 servers (named visa0, visa1, visa2, visa3)
  • Spark(v2.4.0) cluster on 4 nodes (visa1: master, visa0..3: slaves)
  • MongoDB (v3.2.11) sharded cluster con 4 nodes ( config server replica set on visa1..3, mongos on visa1, shard servers: visa0..3 )
  • I'm using MongoDB Spark connector installed with "spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

When configuring SparkSession with MongoShardedPartitioner, every dataframe loaded from the database is empty, though the dataframe schema is fetched correctly.

This is reproduced either the configuration is done in the spark-defaults.conf file or with .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") in the SparkSession builder.

With MongoShardedPartitioner, df.count() == 0:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>                                                                             
>>> df2.count()
0  

But works correctly without specifying partitioner:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162  

Questions:

  • How can I know which partitioner is configured by default?
  • How can MongoShardedPartitioner be used in this scenario?

Thanks in advance

Jan 13rd, 2019: recommended workaround

As answered below, it seems that MongoShardedPartitioner does not support hashed indexes as shard index. However, I need a hash index to distribute the chunks evenly on my nodes, independently of time (using _id would distribute chronologically, I guess).

My workaround has been to create a new field in the database with the computed md5 hash of a date bucket, indexing it (as a normal index), and using it as shard index.

Now, the code works fine:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> 
>>> 
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
...   .getOrCreate()
>>> 
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()

2019-01-13 11:19:31 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162   
jose
  • 41
  • 1
  • 5

1 Answers1

0

Sorry jose to hear you are having an issue with the connector.

How can I know which partitioner is configured by default?

Information regarding partitioners can be found on the Spark connector documentation site. Please file a ticket in the Docs jira project if you feel anything is missing or unclear, it really could help future users!

The default partitioner is a thin wrapper around the MongoSamplePartitioner. It splits up a collection into sized partitions based on statistical sampling of the collection.

How can MongoShardedPartitioner be used in this scenario?

The MongoShardedPartitioner uses the shardKey to generate the partitions. By default it will use _id as the key. You may need to configure that value.

Note: Hashed shardkeys are not supported by the MongoShardedPartitioner as currently there is no way to query a collection against the hashed value - so when retrieving partitions it will fail to return results. I've added DOCS-12345 to update the documentation.

It looks like there is an issue in your setup where the MongoShardedPartitioner is failing to partition the collection as expected and returning 0 results. Schema inference will still work because of how it queries the collection. If its not a config / hashed shardkey issue then issue please file a bug in the Spark jira project and I can help identify the cause and release a fix for you.

Ross
  • 17,861
  • 2
  • 55
  • 73
  • Thanks a lot for this insight. I agree with you in the fact that it should be noted in the documentation that the MongoShardedPartitioner does not support hashed indexes, because this type of indexes are very useful when you want your spark task to be distributed evenly on your nodes. I have solved this creating a new field in the mondoDB, with the md5 hash value of a date bucket, indexing it (normal index), and using it as shard index, so i have all that i wanted. Thanks again. – jose Jan 13 '19 at 10:28
  • @Ross We are hitting a similar issue here, we have a sharded collection with 2 billion documents in it with hashed sharding on _id field and trying to dump the data from Mongo DB Collection using spark job with MongoShardedPartitioner config. I even tried to get the dump without adding the 'MongoShardedPartitioner' config and the job was very slow running forever and I din't succeed generating the dump of the collection. Also, I cannot change the sharding strategy in my case, is there a work around that I can apply here to read the data from Mongo DB using spark jobs? – dwarakesh tp Apr 30 '20 at 07:21