1

I am new to spark. I am following some of the basic example in the documentation.

I have a csv file like this: (a simplified version, the real one has nearly 40,000 lines)

date,category
19900108,apples
19900108,apples
19900308,peaches
19900408,peaches
19900508,pears
19910108,pears
19910108,peaches
19910308,apples
19910408,apples
19910508,apples
19920108,pears
19920108,peaches
19920308,apples
19920408,peaches
19920508,pears

This bit of scala code works fine for counting category totals

val textFile = sc.textFile("sample.csv")
textFile.filter(line => line.contains("1990")).filter(line =>line.contains("peaches")).count()
textFile.filter(line => line.contains("1990")).filter(line => line.contains("apples")).count()
textFile.filter(line => line.contains("1990")).filter(line => line.contains("pears")).count()

What is the best approach for looping through each line, adding category totals by year so that I end up writing a csv file like this:

date,apples,peaches,pears
1990,2,2,1
1991,3,1,1
1992,1,2,2

Any help would be appreciated.

zero323
  • 322,348
  • 103
  • 959
  • 935
ronmac
  • 133
  • 12
  • 2
    Possible duplicate of [Pivot Spark Dataframe](http://stackoverflow.com/questions/30244910/pivot-spark-dataframe) – zero323 Aug 05 '16 at 19:01

1 Answers1

1
//Create Spark SQL Context    
val sqlContext = new SQLContext(sc)

//read csv
var df = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("sample.csv")
df = df.withColumn("year", df.col("date").substr(0,4))
df = df.groupBy("year").pivot("category").agg("category"->"count")
df.withColumn("total", df.col("apples").+(df.col("peaches")).+(df.col("pears"))).show()

//Dependency required:
<dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.4.0</version>
</dependency>
VenkatN
  • 31
  • 3
  • [Spark 2.0](https://spark.apache.org/releases/spark-release-2-0-0.html#new-features) was released recently, it now has native CSV support :) – NikoNyrh Aug 08 '16 at 00:30
  • Thanks VenkatN for your answer. I've been under the weather the past few days so I haven't been able to look at this sooner. When I run this script I get an "error reassignment val" message at this line: df = df.withColumn("year", df.col("date").substr(0,4)) With Spark 2,0 does this mean I don't have t include the databricks csv package? – ronmac Aug 10 '16 at 16:44
  • I declared "df" as var instead of val for the same reason. So, to avoid "error reassignment val" you have to do the same thing or you can initialize a new variable instead of reassigning it to df like: `val df2 = df.withColumn("year", df.col("date").substr(0,4))` `df2.groupBy("year").pivot("category").agg("category"->"count").show()` – VenkatN Aug 11 '16 at 15:31
  • Thanks, that seems to do it. My mistake was using "val df" instead of "var df" I have been fighting a head cold the last few days andI can't think straight What if I wanted a total column so my CSV file will look like this: date,total,apples,peaches,pears 1990,5,2,2,1 – ronmac Aug 12 '16 at 02:31
  • Please check the updated answer, that should help you. – VenkatN Aug 12 '16 at 16:32
  • ok thanks. When i run the code it seems to work fine. It shows year, apples,peaches,pears and correct data on the console Yet when I write a csv file I get this; (date,category,year 19900108,apples,1990 19900108,apples,1990 19900308,peaches,1990 , etc) It is taking the loaded csv and appending the year column. I am usig this: df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata.csv") – ronmac Aug 12 '16 at 17:08
  • This is the output from the code above, I don't see what you are seeing. `+----+------+-------+-----+-----+ |year|apples|peaches|pears|total| +----+------+-------+-----+-----+ |1990| 2| 2| 1| 5| |1991| 3| 1| 1| 5| |1992| 1| 2| 2| 5| +----+------+-------+-----+-----+` – VenkatN Sep 01 '16 at 20:10