12

I am running a spark application that reads data from a few hive tables(IP addresses) and compares each element(IP address) in a dataset with all other elements(IP addresses) from the other datasets. The end result would be something like:

+---------------+--------+---------------+---------------+---------+----------+--------+----------+
|     ip_address|dataset1|dataset2       |dataset3       |dataset4 |dataset5  |dataset6|      date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx|     1  |              1|              0|        0|         0|      0 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              0|              0|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              1|              0|        1|         0|      0 |2017-11-06|
---------------------------------------------------------------------------------------------------

For doing the comparison, I am converting the dataframes resulting from the hiveContext.sql("query") statement into Fastutil objects. Like this:

val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

Then, I am using an iterator to iterate over each collection and write the rows to a file using FileWriter.

val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
     val p = dfIterator.next().toString
     //logic
}

I am running the application with --num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g

The process runs for about 18-19 hours in total for about 4-5 million records with one to one comparisons on a daily basis.

However, when I checked the Application Master UI, I noticed that no activity takes place after the initial conversion of dataframes to fastutil collection objects is done (this takes only a few minutes after the job is launched). I see the count and collect statements used in the code producing new jobs till the conversion is done. After that, no new jobs are launched when the comparison is running.

  • What does this imply? Does it mean that the distributed processing is not happening at all?

  • I understand that collection objects are not treated as RDDs, could
    this be the reason for this?

  • How is spark executing my program without using the resources assigned?

Any help would be appreciated, Thank you!

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Hemanth
  • 705
  • 2
  • 16
  • 32
  • I think you would get more specific answers once you mention what you are doing in the `//logic` part of your code. Since, you mentions that the dataset joining and query output comes within minutes, it is apparent that rest of the hours of your job, which is not distributed, is spending time in this logic section that is being executed in only the driver node. If it is only `write the rows to a file using FileWriter` as you mention, then you should consider doing this write to a distributed file e.g. on hdfs and using `df.write` – sujit Mar 16 '18 at 11:48
  • I'm curious: are the tables just sets of IP addresses, and there're about 4 to 5 million records in each, or..? – AKX Mar 20 '18 at 13:04
  • No, the tables have other columns as well, but I am only selecting the IP addresses. The total number of records from 6 datasets is close to 5 million. @AKX – Hemanth Mar 20 '18 at 15:27
  • Fair. I tried this with an SQLite database with 5 tables of just 2 million IP addresses each; a query to find out which of the 4 other tables have the IP addresses the first one has finishes in _45 seconds_. – AKX Mar 21 '18 at 11:44

1 Answers1

8

After the line:

val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

esp. that part of the above line:

df.map(r => r(0).toString).collect()

which collect is the very main thing to notice, no Spark jobs are ever performed on dfBuffer (which is a regular local one JVM data structure).

Does it mean that the distributed processing is not happening at all?

Correct. collect brings all the data on a single JVM where the driver runs (and is exactly the reason why you should not be doing it unless...you know what you are doing and what problems it may cause).

I think the above answers all the other questions.


A possible solution to your problem of comparing two datasets (in Spark and a distributed fashion) would be to join a dataset with the reference dataset and count to compare whether the number of records didn't change.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • I’ll try the join. One more question, is it possible to convert the dataframes to collections without doing a ‘collect’ ? And will the collections be considered as RDDs if we do a ‘sc.parallelize(collection)’ ? – Hemanth Mar 14 '18 at 13:37
  • I think what you refer to a collection is a regular Scala collection that is local, non-distributed and lives on a single JVM which is exactly the opposite of Spark. That's why you want `collect` or `take` or similar. When you do `sc.parallelize` you will end up with a RDD. – Jacek Laskowski Mar 14 '18 at 13:52