0

My below program attempts to print the price of the most expensive house, given a CSV file containing information regarding the houses. Unexpectedly, I receive 0 as the max price instead of a non-zero integer (which I have confirmed the existence of, for example the first entry in the CSV file)?

Program

public class SparkWordCounter {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("WordCounter").config("spark.master", "local").getOrCreate();

        String fileName = SparkWordCounter.class.getResource("/Sacramentorealestatetransactions.csv").toString();

        StructField[] structFields = {
            DataTypes.createStructField("street", DataTypes.StringType, false),
            DataTypes.createStructField("city", DataTypes.StringType, false),
            DataTypes.createStructField("zip", DataTypes.StringType, false),
            DataTypes.createStructField("state", DataTypes.StringType, false),
            DataTypes.createStructField("beds", DataTypes.ByteType, false),
            DataTypes.createStructField("baths", DataTypes.ByteType, false),
            DataTypes.createStructField("sqFt", DataTypes.ShortType, false),
            DataTypes.createStructField("type", DataTypes.StringType, false),
            DataTypes.createStructField("sale_data", DataTypes.StringType, false),
            DataTypes.createStructField("price", DataTypes.IntegerType, false),
            DataTypes.createStructField("latitude", DataTypes.StringType, false),
            DataTypes.createStructField("longitude", DataTypes.StringType, false)
        };

        StructType structType = DataTypes.createStructType(structFields);
        Dataset<Row> dataset = sparkSession.read().option("header", "true").schema(structType).csv(fileName);

        Dataset<Building> buildingDataset = dataset.as(Encoders.bean(Building.class));

        long price = buildingDataset
                .map(building -> building.price, Encoders.INT())
                .reduce(Integer::max);

        System.out.println("Price: " + price);
    }

    public static class Building implements Serializable {
        public String street;
        public String city;
        public String zip;
        public String state;
        public byte beds;
        public byte baths;
        public short sqFt;
        public String type;
        public String sale_date;
        public int price;
        public String latitude;
        public String longitude;
    }
}

CSV first entry (985 total)

street,city,zip,state,beds,baths,sqFt,type,sale_date,price,latitude,longitude
---,---,---,---,2,1,836,Residential,Wed May 21 00:00:00 EDT 2008,59222,38.---,---

--- represents information revealing location

The prices are guaranteed to be integers, thus an int is used (byte and short are used for values with smaller ranges).

Why is 0 being the calculated as the max price?

Mario Ishac
  • 5,060
  • 3
  • 21
  • 52
  • Is that `reduce()` statement correct? – Rafael Oct 03 '18 at 05:04
  • @Rafael I would think so, something like `Stream.of(1, 2, 3, 4, 3, 2, 1).reduce(Integer::max)` gives 4. Though it's entirely possible that Spark's reduce functions differently to the point of changing behavior (one difference is that it doesn't return optional). – Mario Ishac Oct 03 '18 at 05:09
  • It's correct, just wondering... – Rafael Oct 03 '18 at 05:16
  • what does `buildingDataset.select("price").show()`. Since you are using spark sql, try leverage SQL functionality? – mrsrinivas Oct 03 '18 at 06:44

1 Answers1

0

Check whether the dataframe loaded from CSV properly or not. You can check the data in it by

buildingDataset.select("price").show(20)

If the data for price column data seems fine, then use any one of the way to obtain maximum.

SQL API

buildingDataset.createOrReplaceTempView("building");
sparkSession.sql("SELECT price FROM building ORDER BY price DESC LIMIT 1")
  .show();

Java

import static org.apache.spark.sql.functions.*;
buildingDataset.orderBy(col("price").desc())
    .limit(1)
    .show();
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • check this other post, if you face any issue in [reading CSV](https://stackoverflow.com/a/39533431/1592191) – mrsrinivas Oct 03 '18 at 07:17
  • This solution works, however do you have any insight into why the one I provided does not work? The CSV is loaded correctly (I confirmed that), I'm assuming its because each `Building` instance isn't properly initialized (prices default to 0, default int value). – Mario Ishac Oct 03 '18 at 21:38