1

I want to aggregate data based on intervals on timestamp columns.

I saw that it takes 53 seconds for computation, but 5 minutes to write result in the CSV file. It seems like df.csv() takes too much to write.

How can I optimize the code please ?

Here is my code snippet :

val df = spark.read.option("header",true).option("inferSchema", "true").csv("C:\\dataSet.csv\\inputDataSet.csv")

//convert all column to numeric value in order to apply aggregation function 
df.columns.map { c  =>df.withColumn(c, col(c).cast("int")) }

//add a new column inluding the new timestamp column
val result2=df.withColumn("new_time",((unix_timestamp(col("_c0"))/300).cast("long") * 300).cast("timestamp")).drop("_c0")

val finalresult=result2.groupBy("new_time").agg(result2.drop("new_time").columns.map(mean(_)).head,result2.drop("new_time").columns.map(mean(_)).tail: _*).sort("new_time")

finalresult.coalesce(1).write.option("header", "true").csv("C:/result_with_time.csv")//<= it took to much to write
eliasah
  • 39,588
  • 11
  • 124
  • 154
user7394882
  • 193
  • 5
  • 17
  • How it is actually that you are evaluating the time spent on each task ? Remember that spark evaluations are lazy. Coalesce(1) is an **anti-pattern** unless your data is small and if your data is already small to begin with, spark is an overkill. So what's your case ? – eliasah Mar 03 '17 at 08:04
  • @eliasah ,well for time eveluation i wrote time function to check time of each task, for the data ,yes am using a big data file ,about 3.490.000 rows and 134 columns, what it is the best to write the dataframe result into the csv file thanks – user7394882 Mar 03 '17 at 08:26
  • 2
    Try writing into hdfs in spite of FS local file.In your case it will be bottleneck to collect all data from all nodes into one place rather the way to write distributed data directly into distributed FS – FaigB Mar 03 '17 at 08:33
  • @FaigB what is the best command i use to write into the hdfs ? – user7394882 Mar 03 '17 at 08:35
  • Usually when you have "big data" files, in practice, partitioned data files are better than **one** big file. So first, I'd not use coalesce(1) to start with (please read my answer [here](https://www.quora.com/What-will-happen-if-we-use-coalesce-1-on-very-large-amount-of-data-in-Apache-Spark) concerning that point). – eliasah Mar 03 '17 at 08:42
  • Secondly, if you still want that one big data output file. You'd rather write it partitioned and use cat like functions (http://stackoverflow.com/questions/60244/is-there-replacement-for-cat-on-windows in local mode or `hadoop fs -getmerge` (http://stackoverflow.com/questions/5700068/merge-output-files-after-reduce-phase) – eliasah Mar 03 '17 at 08:45

1 Answers1

1

Here are some thoughts on optimization based on your code.

  1. inferSchema: it will be faster to have a predefined schema rather than using inferSchema.
  2. Instead of writing into your local, you can try writing it in hdfs and then scp the file into local.
  3. df.coalesce(1).write will take more time than just df.write. But you will get multiple files which can be combined using different techniques. or else you can just let it be in one directory with with multiple parts of the file.
braj
  • 2,545
  • 2
  • 29
  • 40