0

I have three dataframes, dictionary,SourceDictionary and MappedDictionary. The dictionary andSourceDictionary have only one column, say words as String. The dictionary which has million records, is a subset of MappedDictionary (Around 10M records) and each record in MappedDictionary is substring of dictionary. So, I need to map the ditionary with SourceDictionary to MappedDictionary. Example:

Records in ditionary : BananaFruit, AppleGreen
Records in SourceDictionary : Banana,grape,orange,lemon,Apple,...

Records to be mapped in MappedDictionary (Contains two columns) :

BananaFruit Banana
AppleGreen Apple

I planned to do like two for loops in java and make substring operation but the problem is 1 million * 10 million = 10 Trillion iterations Also, I can't get correct way to iterate a dataframe like a for loop Can someone give a solution for a way to make iteration in Dataframe and perform substring operations? Sorry for my poor English, I am a non-native Thanks for stackoverflow community members in advance :-)

Psidom
  • 209,562
  • 33
  • 339
  • 356
Gowthaman V
  • 161
  • 3
  • 11

1 Answers1

1

Though you have million record in sourceDictionary because it has only one column broadcasting it to every node won't take up much memory and it will speed up the total performance.

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.encoders.RowEncoder

//Assuming the schema names
val SourceDictionarySchema = StructType(StructField("value",StringType,false)) 
val dictionarySchema = StructType(StructField("value",StringType,false))
val MappedDictionary = StructType(StructField("value",StringType,false), StructField("key",StringType,false))

val sourceDictionaryBC = sc.broadcast(
   sourceDictionary.map(row =>
      row.getAs[String]("value")
   ).collect.toList
)

val MappedDictionaryN = dictionary.map(row =>
   val value = row.getAs[String]("value")
   val matchedKey = sourceDictionaryBC.value.find(value.contains)

   Seq(value, matchedKey.orNull)
)(RowEncoder(MappedDictionary))

After this you have all the new mapped records. If you want to combine it with the existing MappedDictionary just do a simple union.

MappedDictionaryN.union(MappedDictionary)
  • Thanks for your suggestion Swadhin. This is what I exactly looking for. – Gowthaman V Mar 21 '17 at 18:59
  • But how much records can be handled by the broadcast? I mean maximum file size can be broadcasted? – Gowthaman V Mar 21 '17 at 19:05
  • As long as the size is less then the memory of the worker node you can broadcast these records. Broadcast is mainly used to cache small dataset in each node so that while executing the DataFarme, that node don't have to retrieve that dataset from another node. In your case because sourceDictionary only has one column and a few million record broadcasting is the best approach. – Swadhin Shahriar Mar 21 '17 at 19:22
  • Helpful links: [1](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html), [2](http://stackoverflow.com/questions/26884871/advantage-of-broadcast-variables), [3](https://blog.knoldus.com/2016/04/30/broadcast-variables-in-spark-how-and-when-to-use-them/) – Swadhin Shahriar Mar 21 '17 at 19:25