I am running a spark application that reads data from a few hive tables(IP addresses) and compares each element(IP address) in a dataset with all other elements(IP addresses) from the other datasets. The end result would be something like:
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| ip_address|dataset1|dataset2 |dataset3 |dataset4 |dataset5 |dataset6| date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx| 1 | 1| 0| 0| 0| 0 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 0| 0| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 1| 0| 1| 0| 0 |2017-11-06|
---------------------------------------------------------------------------------------------------
For doing the comparison, I am converting the dataframes
resulting from the hiveContext.sql("query")
statement into Fastutil
objects. Like this:
val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
Then, I am using an iterator
to iterate over each collection and write the rows to a file using FileWriter
.
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
val p = dfIterator.next().toString
//logic
}
I am running the application with --num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g
The process runs for about 18-19 hours in total for about 4-5 million records with one to one comparisons on a daily basis.
However, when I checked the Application Master UI, I noticed that no activity takes place after the initial conversion of dataframes
to fastutil collection objects
is done (this takes only a few minutes after the job is launched). I see the count
and collect
statements used in the code producing new jobs till the conversion is done. After that, no new jobs are launched when the comparison is running.
What does this imply? Does it mean that the distributed processing is not happening at all?
I understand that collection objects are not treated as RDDs, could
this be the reason for this?How is spark executing my program without using the resources assigned?
Any help would be appreciated, Thank you!