1

I have a csv file that consists of data in this format:

id,  name,  surname,   morecolumns
5,   John,  Lok,       more
2,   John2, Lok2,      more
1,   John3, Lok3,      more
etc..

I want to sort my csv file using the id as key and store the sorted results in another file.

What I've done so far in order to create JavaPairs of (id, rest_of_line).

    SparkConf conf = new SparkConf().setAppName.....;

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> file = sc.textFile("inputfile.csv");

    // extract the header
    JavaRDD<String> lines = file.filter(s -> !s.equals(header));

    // create JavaPairs
    JavaPairRDD<Integer, String> pairRdd = lines.mapToPair(
      new PairFunction<String, Integer, String>() {
        public Tuple2<Integer, String> call(final String line) {


            String str = line.split(",", 2)[0];
            String str2 = line.split(",", 2)[1];
            int id = Integer.parseInt(str);

            return new Tuple2(id, str2);
        }
    });

    // sort and save the output
    pairRdd.sortByKey(true, 1);
    pairRdd.coalesce(1).saveAsTextFile("sorted.csv");

This works in cases that I have small files. However when I am using bigger files, the output is not sorted properly. I think this happens because the sort procedure takes place on different nodes, so the merge of all the procedures from all the nodes doesn't give the expected output.

So, the question is how can I sort my csv file using the id as key and store the sorted results in another file.

Spithas
  • 11
  • 2
  • 1
    Don't `coalesce(1)`? It it a good candidate for the most useless method call in Spark. – zero323 Mar 19 '16 at 13:28
  • Well if i remove it, firstly, it splits the results into parts while i want them in one file and secondly they are still not sorted. – Spithas Mar 19 '16 at 13:41
  • 1
    `sortByKey` range partitions the data (each output file has a specific range) of values and sorts values for each partition (every file is sorted). Coalescing to one file is useless in genera because: it puts load on a single machine, with distributed file system transfers data multiple times - to node to coalsece and from node to distributed. If data is small enough to make it feasible and you really need a single file then collecting or merging is a better choice. – zero323 Mar 19 '16 at 13:48
  • If you want one file. You can always cat output/part* > ... afterwards – eliasah Mar 19 '16 at 14:13
  • 1
    if you are storing in HDFS you can always store multiple parts and then "hdfs dfs -getmerge sorted.csv/part-* sorted.csv – PinoSan Mar 19 '16 at 17:01
  • Might just be a mistake in copy-pasting into SO - but **there's a mistake here**: the result of `sortByKey` is _ignored_, and the original `pairRdd` (which is immutable, therefore not sorted) is coalesced and saved. Did you mean `JavaRDD sorted = pairRdd.sortByKey(true, 1); sorted.coalesce(1).saveAsTextFile("sorted.csv");` ? – Tzach Zohar Mar 19 '16 at 21:27

2 Answers2

0

The method coalesce is probably the one to blame, as it apparently does not contractually guarantee the ordering or the resulting RDD (see Which operations preserve RDD order?). So if you avoid such coalesce, the resulting output files will be ordered. As you want a unique csv file, you could get the results from whatever file-system you're using but taking care of their actual order, and merge them. For example, if you're using HDFS (as stated by @PinoSan) this can be done using the command hdfs dfs -getmerge <hdfs-output-dir> <local-file.csv>.

Community
  • 1
  • 1
mauriciojost
  • 370
  • 1
  • 11
0

As pointed by @mauriciojost, you should not do coalesce. Instead, better way to do this is pairRdd.sortByKey(true,pairRdd.getNumPartitions()).saveAsTextFile(path) so that maximum possible work is carried out on partitions that hold data.

piet.t
  • 11,718
  • 21
  • 43
  • 52
Abhishek Kumar
  • 292
  • 3
  • 10