0

My CSV File:

YEAR,UTILITY_ID,UTILITY_NAME,OWNERSHIP,STATE_CODE,AMR_METERING_RESIDENTIAL,AMR_METERING_COMMERCIAL,AMR_METERING_INDUSTRIAL,AMR_METERING_TRANS,AMR_METERING_TOTAL,AMI_METERING_RESIDENTIAL,AMI_METERING_COMMERCIAL,AMI_METERING_INDUSTRIAL,AMI_METERING_TRANS,AMI_METERING_TOTAL,ENERGY_SERVED_RESIDENTIAL,ENERGY_SERVED_COMMERCIAL,ENERGY_SERVED_INDUSTRIAL,ENERGY_SERVED_TRANS,ENERGY_SERVED_TOTAL
2011,34,City of Abbeville - (SC),M,SC,880,14,,,894,,,,,,,,,,
2011,84,A & N Electric Coop,C,MD,135,25,,,160,,,,,,,,,,
2011,84,A & N Electric Coop,C,VA,31893,2107,0,,34000,,,,,,,,,,
2011,97,Adams Electric Coop,C,IL,8334,190,,,8524,,,,,0,,,,,0
2011,108,Adams-Columbia Electric Coop,C,WI,33524,1788,709,,36021,,,,,,,,,,
2011,118,Adams Rural Electric Coop, Inc,C,OH,7457,20,,,7477,,,,,,,,,,
2011,122,Village of Arcade,M,NY,3560,498,100,,4158,,,,,,,,,,
2011,155,Agralite Electric Coop,C,MN,4383,227,315,,4925,,,,,,,,,,

Here down the Spark code to read the CSV file:

import org.apache.spark.api.java.JavaSparkContext;

public class RddCsv 
{
    public static void main(String[] args) 
    {
    SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> allRows = sc.textFile("file:///home/kumar/Desktop/Eletricaldata/file8_2011.csv");//read csv file
    System.out.println(allRows.take(5)); 
   }
}

I am new Learner sparkJava, How to Select Perticuler field value from that CsvDataset and How to Perform aggregation Operations, and how to use Transformations and Actions that given Dataset. and how to select perticular field value

kumar
  • 207
  • 1
  • 3
  • 9
  • Possible duplicate of [How to Parsing CSV or JSON File with Apache Spark](http://stackoverflow.com/questions/25362942/how-to-parsing-csv-or-json-file-with-apache-spark) – Jobin Dec 14 '16 at 07:43

1 Answers1

0
public static void main(String[] args)
{
    SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> allRows = sc.textFile("file:///home/abhishek/Desktop/file8_2011.csv");
    System.out.println(allRows.take(5));
    List<String> headers= Arrays.asList(allRows.take(1).get(0).split(","));
    String field="YEAR";
    //Skip Header
    JavaRDD<String>dataWithoutHeaders=allRows.filter(x -> !(x.split(",")[headers.indexOf(field)]).equals(field));
    //Take one field as integer
    JavaRDD<Integer> years=dataWithoutHeaders.map(x -> Integer.valueOf(x.split(",")[headers.indexOf(field)]));
    //Aggregate operation getTotal aggregate() arguments are initial value for a partition,aggregating function for a partition
    //and aggregating function for results from different partition
    int total=years.aggregate(0,RddCsv::sum,RddCsv::sum);
    for (Integer i:years.collect()){
        System.out.println("year :: "+i);
    }
    System.out.println(total);
}

private static int sum(int a,int b){
    return a+b;
}

This is a basic program.You should read spark's java apis for detailed info.

Abhishek Kumar
  • 292
  • 3
  • 10
  • not working,compitime errors are comming this line .aggregate(0,RddCsv::sum,RddCsv::sum); – kumar Dec 14 '16 at 10:54
  • I have run it just now. – Abhishek Kumar Dec 14 '16 at 10:56
  • output : 16/12/14 16:26:31 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 25 ms on localhost (1/1) 16/12/14 16:26:31 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 16/12/14 16:26:31 INFO DAGScheduler: Job 3 finished: collect at RDDCsv.java:32, took 0.049729 s year :: 2011 year :: 2011 year :: 2011 year :: 2011 year :: 2011 year :: 2011 year :: 2011 year :: 2011 16088 16/12/14 16:26:31 INFO SparkContext: Invoking stop() from shutdown hook – Abhishek Kumar Dec 14 '16 at 10:57
  • Please check your configurations,filenames,etc. or paste your errors in comment. – Abhishek Kumar Dec 14 '16 at 10:58
  • this line : int total=years.aggregate(0,RddCsv::sum,RddCsv::sum);List headers= Arrays.asList(allRows.take(1).get(0).split(",")); – kumar Dec 14 '16 at 11:00
  • I am not asking for line,I am asking error(like build error) because this code is running fine,so it must be due to your project configuration.Please paste error somewhere and put link here. – Abhishek Kumar Dec 14 '16 at 11:03
  • Exception in thread "main" java.lang.Error: Unresolved compilation problems: Arrays cannot be resolved – kumar Dec 14 '16 at 11:05
  • Exception in thread "main" java.lang.Error: Unresolved compilation problems: Arrays cannot be resolved The method aggregate(U, Function2, Function2) in the type AbstractJavaRDDLike> is not applicable for the arguments (int, RddCsv::sum, RddCsv::sum) The type RddCsv does not define sum(U, Integer) that is applicable here The type RddCsv does not define sum(U, U) that is applicable here at com.inndata.EletricalData.a.main(a.java:17) – kumar Dec 14 '16 at 11:05
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/130596/discussion-between-abhishek-kumar-and-kumar). – Abhishek Kumar Dec 14 '16 at 11:08
  • join me in above chat link.[link]http://chat.stackoverflow.com/rooms/130596/discussion-between-abhishek-kumar-and-kumar – Abhishek Kumar Dec 14 '16 at 11:15