0

My task is to analyze Kennedy Space Center logs using Apache Spark. The code is working but I want to get rid of groupBy operation because of it cost.

The code below collect list of requests with 5xx error code and count failed requests.

My code

SparkSession session = SparkSession.builder().master("local").appName(application_name).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(session.sparkContext());
JavaRDD<LogEntry> input = jsc.textFile(hdfs_connect + args[0])
                .map(App::log_entry_extractor)
                .filter(Objects::nonNull);

Dataset<Row> dataSet = session.createDataFrame(input, LogEntry.class);

// task 1
dataSet.filter(col("returnCode").between(500, 599))
                .groupBy("request")
                .count()
                .select("request", "count")
//                .sort(desc("count"))
                .coalesce(1)
                .toJavaRDD()
                .saveAsTextFile(hdfs_connect + output_folder_task_1);

Example of data

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0
205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985
d104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] "GET / HTTP/1.0" 200 7074
Yaron
  • 10,166
  • 9
  • 45
  • 65
powercat
  • 99
  • 4
  • Could you please elaborate on your desired output? What do you expect to get? – Nir Hedvat Jun 13 '19 at 07:55
  • Code reads RDD, then converts it to DataFrame, and then back to RDD; such conversions can take some time. Fo example, just RDD can be used for all operations: extract key and then just "reduceByKey" used. – pasha701 Jun 13 '19 at 10:05
  • Oh my poor broken pretzel... how did we end up here with such an horrible pile of frameworks and convoluted code... You are literally wasting time and effort. In java, I would read lines and count in a variable (while writing to another files if that is desired). In linux you would grep. awk, wc (some neat oneliner sent over ssh.). This spark insanity is 10000 times too complicated for what you need. – user2023577 Jun 14 '19 at 13:24

1 Answers1

0

There is nothing wrong with groupBy in this context - DataFrame / Dataset groupBy behaviour/optimization - nor there is really a viable alternative.

coalesce(1) from the other hand, is most of the time an anti-pattern, which in the worst case scenario can turn your process into a sequential one

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

Consider replacing it with repartition(1) or removing whatsoever