1

I am using spark-sql-2.4.1v which is using hadoop-2.6.5.jar version . I need to save my data first on hdfs and move to cassandra later. Hence I am trying to save the data on hdfs as below:

String hdfsPath = "/user/order_items/";
cleanedDs.createTempViewOrTable("source_tab");

givenItemList.parallelStream().forEach( item -> {   
    String query = "select $item  as itemCol , avg($item) as mean groupBy year";
    Dataset<Row> resultDs = sparkSession.sql(query);

    saveDsToHdfs(hdfsPath, resultDs );   
});


public static void saveDsToHdfs(String parquet_file, Dataset<Row> df) {
    df.write()                                 
      .format("parquet")
      .mode("append")
      .save(parquet_file);
    logger.info(" Saved parquet file :   " + parquet_file + "successfully");
}

When I run my job on cluster it fails throwing this error:

java.io.IOException: Failed to rename FileStatus{path=hdfs:/user/order_items/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.parquet; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to hdfs:/user/order_items/part-00007.parquet
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)

Please suggest how to fix this issue?

BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • 3
    This is because your all threads are trying to write data in same directory & try to give different directory for each thread .. say if you have 2 threads .. give directory something like this - /user/order_items/1/ & /user/order_items/2/ .. use some index inside to append to hdfsPath – Srinivas May 27 '20 at 06:46
  • @Srinivas then how to read back ... partitions would be disturbed right ? – BdEngineer May 27 '20 at 06:53
  • @Srinivas I need to give this hdfs path to another persons job.. – BdEngineer May 27 '20 at 06:54
  • try to use partitions – Srinivas May 27 '20 at 06:55
  • @Srinivas Vas i am already repartitioned the dataset .. – BdEngineer May 27 '20 at 07:06
  • fyi ,this error is only coming in my cluster i.e.while I run my local which is of local[*]...i.e. it is running on two thread/executors it seems... i dont see this error. – BdEngineer May 27 '20 at 07:13
  • @BdEngineer - i thinkfirst comment is right - you should not write to same hdfs folders from multiple threads. You could just switch to `stream()` instead of `parallelStream()`. I don't think you'll loose in performance in this case. Or you can do as an answer below suggests. Or you can write into multiple directories with 2 options for reading - 1. Read all directories later; 2. Merge directories after all DSs are processed. – Vladislav Varslavans May 27 '20 at 07:42
  • 2
    Local mode most of the time you will not see the error.. you will only get cluster mode. – Srinivas May 27 '20 at 07:52

2 Answers2

6

You can do all the selects in one single job, get all the selects and union in a single table.

Dataset<Row> resultDs = givenItemList.parallelStream().map( item -> {   
    String query = "select $item  as itemCol , avg($item) as mean groupBy year";
    return sparkSession.sql(query);
}).reduce((a, b) -> a.union(b)).get

saveDsToHdfs(hdfsPath, resultDs );
Alfilercio
  • 1,088
  • 6
  • 13
  • thank you are mixing java with scala api here.. I have another dataframe ... which is not in loop still it has same issue while writing on hdfs... by the way i am using hadoop-2.6.5.jar – BdEngineer May 27 '20 at 07:51
  • that error gone but getting another error 20/05/27 10:00:22 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from dev:7337 java.lang.NullPointerException at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100) – BdEngineer May 27 '20 at 10:12
  • 1
    All methods are java 8 Stream or Optional valid methods. If you have more dataframes with the same schema and column order use union with the returned Dataframe also. https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#reduce-java.util.function.BinaryOperator- – Alfilercio May 27 '20 at 10:56
  • one doubt , how this source_tab table i.e. cleanedDs.createTempViewOrTable("source_tab"); available in the map lambda function ??...Is this the right way to use source_tab inside lambda function ? this givenItemList is Java list right ... how cleanedDs i.e source_tab accessable in this map function? if I convert givenItemList to dataset , still cleanedDs i.e source_tab accessable in this map function? – BdEngineer May 27 '20 at 11:42
  • any advice how to solve this in spark ... https://stackoverflow.com/questions/62933135/dataframe-look-up-and-optimization – BdEngineer Jul 16 '20 at 10:59
2

The error is that you are trying to write the dataframe onto the same location for each the item in your givenItemList collection. Usually if do that it should give error

OutputDirectory already exists

But since the foreach function would execute all the items in parallel thread, you are getting this error.You can give separate directories for each thread like this

givenItemList.parallelStream().forEach( item -> {   
String query = "select $item  as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(Strin.format("%s_item",hdfsPath), resultDs );   

});

Or else you can also have subdirectories under hdfspath like this

givenItemList.parallelStream().forEach( item -> {   
String query = "select $item  as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);

saveDsToHdfs(Strin.format("%s/item",hdfsPath), resultDs );   

}); `