0

I am using a third party package for spark that utilizes a "PointFeature" object. I am trying to take a csv file and put elements form each row into an Array of these PointFeature objects.

The PointFeature constructor for my implementation looks like this:

Feature(Point( _c1, _c2), _c3)

where _c1, _c2, and _c3 are the columns of my csv and represent doubles.

Here is my current attempt:

val points: Array[PointFeature[Double]] = for{
    line <- sc.textFile("file.csv")
    point <- Feature(Point(line._c1,line._c2),line._c3)
} yield point

My error shows up when referencing the columns

<console>:36: error: value _c1 is not a member of String
   point <- Feature(Point(line._c1,line._c2),line._c3.toDouble)
                               ^
<console>:36: error: value _c2 is not a member of String
       point <- Feature(Point(line._c1,line._c2),line._c3.toDouble)
                                            ^
<console>:36: error: value _c3 is not a member of String
       point <- Feature(Point(line._c1,line._c2),line._c3.toDouble)
                                                      ^

This is obviously because I'm referencing a String as if it were an element of a DataFrame. I'm wondering if there is a way to work with DataFrames in this loop format, or a way to split each line into a List of doubles. Maybe I need an RDD?

Also, I'm not certain that this will yield an Array. Actually, I suspect it will return an RDD...

I am using Spark 2.1.0 on Amazon EMR

Here are some other Questions I have drawn from:

How to read csv file into an Array of arrays in scala

Splitting strings in Apache Spark using Scala

How to iterate records spark scala?

Community
  • 1
  • 1
user306603
  • 11
  • 3

2 Answers2

0

You could set up a Dataset[Feature] this way:

case class Feature(x: Double, y: Double, z: Double)
sparkSession.read.csv("file.csv")
    .toDF("x", "y", "z")
    .withColumn("x", 'x.cast(DoubleType))
    .withColumn("y", 'y.cast(DoubleType))
    .withColumn("z", 'z.cast(DoubleType))
    .as[Feature]

Then you can consume your strongly-typed DataSet[Feature] as you see fit.

Vidya
  • 29,932
  • 7
  • 42
  • 70
0

I suggest taking this on in smaller steps.

Step One

Get your rows as an Array/List/whatever of Strings.

val lines = sc.textFile("file.txt").getLines, or something like that.

Step Two

Break your lines in to their own lists of columns.

val splits = lines.map {l => l.split(",")}

Step Three

Extract your colums as vals that you can use

val res = splits.map { 
  case Array(col1, col2, col3) => // Convert to doubles, put in to Feature/Point Structure}
  case _ => // Handle the case where your csv is malformatted
}

This can all be done in one go, I only split them to show the logical step from file → list string → list list string → list Feature

Charles
  • 317
  • 1
  • 11