2

Its a continuation of this question: Porting a multi-threaded compute intensive job to spark

I am using forEachPartition as suggested here to loop over a list of 10000 IDs, then I do a repartition(20) because every partition creates DB connection and if I create say 100 partitions, the job just dies because of 100 open connections to postgres and mongo. I use postgres connections not only to store data but to lookup some data from another table. I can get rid of storing the data to postgres directly from my task and do it as post processing from a sequence file.

But I ideally need to massively parallelize my spark job so that the task completed within a given time, currently it processed about 200 IDs in 20hrs, but I need to process 10000 IDs in 20hrs. So repartition(20) is clearly not helping. I am bound by IO on db here.

So what are my options where I can efficiently share this data across all tasks? I want data in mongo and postgres to be treated as in memory lookup tables - total size is about 500gb.

My options are:

  1. RDD (I don't think RDD fits my usecase)
  2. Dataframe
  3. Broadcast variables (not sure of this will work as its creation needs 500gb available in the spark driver)
  4. Move data from mongo to s3 and tasks lookup from s3.
Community
  • 1
  • 1
zengr
  • 38,346
  • 37
  • 130
  • 192

2 Answers2

1

The techinique we follow for this kind of problem is to:

  1. Store the lookups in different collection of MongoDB.
  2. Using Hadoop MongoDB connector get the data from MongoDB and store it in an RDD
  3. Broadcast the variable so as it will be available to all the Node/Worker
  4. Now if the data is in HDFS create an RDD for it or if the data is in MongoDB using the Hadoop MongoDB connector.
  5. Now perform the lookup matching part
  6. Save the file as an Sequence file or you can also save it on S3 need to check on it as we store it back to MongoDB
Ajay Gupta
  • 3,192
  • 1
  • 22
  • 30
  • What do you publish in the `Broadcast`? `JavaPairRDD`? – zengr Sep 11 '15 at 18:08
  • Yes we can add the RDD to Braodcast and use it. for ex. http://stackoverflow.com/questions/17621596/spark-whats-the-best-strategy-for-joining-a-2-tuple-key-rdd-with-single-key-rd – Ajay Gupta Sep 11 '15 at 18:12
  • `collectAsMap` on the rdd wont work for me since the data is too big to fit on a single machine (driver in this case) – zengr Sep 11 '15 at 18:13
  • This techinque works well for our case as Lookups are medium sized.You can try for loading the Lookup file and then making it persist and then do the computation which will take a lot of time since lot of shuffling will be present or to avoid this if you are working on cloud increase the capacity – Ajay Gupta Sep 11 '15 at 18:23
0

Old post here, but I ended up with this solution.

  1. Create JavaPairRDD<Long, YourType> where Long is the ID and YourTypeHere can be a list, object anything what data in postgres or mongo reflects for an ID.

  2. Say we end up with 3 data sources for an ID. Now we join all those 3 JavaPairRDDs and end up with a localized tuple. JavaRDD<Tuple4<Long, YourType1, YourType2, YourType3>>. This tuple we end up with is all I need, which will be passed to the simulation logic.

  3. Once we have our JavaRDD, we repartition(n) it where n=total number of partitions I can handle.

I didn't have to do in memory lookup by properly localizing data inside an RDD (step2). By removing any I/O (db calls) within distributed tasks (i.e. code running within map like operations in spark), I achieved expected execution time. I am not able to process about 200 IDs in an hour now (most time is taken in the actual simulation logic), data loading etc takes <10mins.

zengr
  • 38,346
  • 37
  • 130
  • 192