-2

I would like to create a RDD with records in the following format:

(trip, (starting station details), (ending station details))

import org.apache.spark._

val input1 = sc.textFile("data/trips/*")
val header1 = input1.first // to skip the header row
val trips = input1.filter(_ != header1).map(_.split(","))

val input2 = sc.textFile("data/stations/*")
val header2 = input2.first // to skip the header row
val stations = input2.filter(_!=header2).map(_.split(",")).keyBy(_(0).toInt)

def pjoined (joined: (Array[String], Array[String], Array[String])) = {
    println(""+joined._1.deep.mkString(",")+"; "+joined._2.deep.mkString(",")+"; "+joined._3.deep.mkString(","))
}

val joinedtrips = trips.map(tup => (tup, (stations.filter(_._1==tup(4).toInt).first._2), (stations.filter(_._1==tup(7).toInt).first._2)))
joinedtrips.take(5).foreach(pjoined)

The second last line fails with the following error:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation.

What would be the proper and efficient way to achieve this?

stations.csv:

station_id,name,lat,long,dockcount,landmark,installation,notes
2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013,
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013,
...

trips.csv:

Trip ID,Duration,Start Date,Start Station,Start Terminal,End Date,End Station,End Terminal,Bike #,Subscription Type,Zip Code
4258,114,8/29/2013 11:33,San Jose City Hall,10,8/29/2013 11:35,MLK Library,11,107,Subscriber,95060
4265,151,8/29/2013 11:40,San Francisco City Hall,58,8/29/2013 11:42,San Francisco City Hall,58,520,Subscriber,94110
...

station_id in stations.csv is to match with Start Terminal (index 4) and End Terminal (index 7) in trips.csv

Shaido
  • 27,497
  • 23
  • 70
  • 73
  • Any reason you are not using the newer DataFrame API (it's both easier to use and clearer)? – Shaido Sep 18 '18 at 02:32
  • Hello Shaido, thank you for editing the question to a better format. It is from some learning material I am going through. It has not covered DataFrame yet, but I will look into it. Thank you for sharing this idea. – user3562462 Sep 18 '18 at 02:47
  • In case of RDD or dataframe, what you want to do here is to use `join`. Some examples of how it can be done on RDDs can be seen here: https://stackoverflow.com/questions/27437507/join-two-ordinary-rdds-with-without-spark-sql/32020165 – Shaido Sep 18 '18 at 03:03
  • Why don't you have filter condition with ur header lines like. filter(× =》 !x.contains("station_id,name,lat,long,dockcount,landmark") . This is simple trick – Balaji Reddy Sep 18 '18 at 05:30

1 Answers1

0

Two ways to do it. In addition, please read Shaido's comment to use Dataframe.

val bcStations = sc.broadcast(stations.collectAsMap)

val joined = trips.map(trip =>{
    (trip, bcStations.value.getOrElse(trip(4).toInt, Nil), bcStations.value.getOrElse(trip(7).toInt, Nil))
})

println(joined.toDebugString)

joined.take(1)

val mapStations = stations.collectAsMap

val joinedtrips = trips.map(trip => {
    (trip, mapStations.getOrElse(trip(4).toInt, Nil), mapStations.getOrElse(trip(7).toInt, Nil))
})

joinedtrips.take(1)