-3

I thought it can work,but i failed actually

import math._
import org.apache.spark.sql.SparkSession

object Position {
  def main(args: Array[String]): Unit = {
    // create Spark DataFrame with Spark configuration
    val spark= SparkSession.builder().getOrCreate()
    // Read csv with DataFrame
    val file1 = spark.read.csv("file:///home/aaron/Downloads/taxi_gps.txt")
    val file2 = spark.read.csv("file:///home/aaron/Downloads/district.txt")
    //change the name
    val new_file1= file1.withColumnRenamed("_c4","lat")
                        .withColumnRenamed("_c5","lon")
    val new_file2= file2.withColumnRenamed("_c0","dis")
                        .withColumnRenamed("_1","lat")
                        .withColumnRenamed("_2","lon")
                        .withColumnRenamed("_c3","r")
    //geo code
    def haversine(lat1:Double, lon1:Double, lat2:Double, lon2:Double): Double ={
      val R = 6372.8  //radius in km
      val dLat=(lat2 - lat1).toRadians
      val dLon=(lon2 - lon1).toRadians
      val a = pow(sin(dLat/2),2) + pow(sin(dLon/2),2) * cos(lat1.toRadians) * cos(lat2.toRadians)
      val c = 2 * asin(sqrt(a))
      R * c
    }
    //count
    new_file2.foreach(row => {
      val district = row.getAs[Float]("dis")
      val lon = row.getAs[Float]("lon")
      val lat = row.getAs[Float]("lat")
      val distance = row.getAs[Float]("r")
      var temp = 0
      new_file1.foreach(taxi => {
        val taxiLon = taxi.getAs[Float]("lon")
        val taxiLat = taxi.getAs[Float]("lat")
        if(haversine(lat,lon,taxiLat,taxiLon) <= distance) {
          temp+=1
        }
      })
      println(s"district:$district temp=$temp")
    })
   }
  }

Here's results

  20/06/07 23:04:11 ERROR SparkContext: Error initializing SparkContext.
  org.apache.spark.SparkException: A master URL must be set in your configuration
  ......
  20/06/07 23:04:11 ERROR Utils: Uncaught exception in thread main
  java.lang.NullPointerException
  ......
  20/06/07 23:04:11 INFO SparkContext: Successfully stopped SparkContext
  Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration

I am not sure that since this seems to be Spark, using a DF inside a DF is the only mistake to this program. I am not familiar with scala and spark,it is quite a tough question for me. I hope you guys can help me,thx!

limoli
  • 1

1 Answers1

0

Your exception says org.apache.spark.SparkException: A master URL must be set in your configuration set master url in master function.

I hope you are running code in some IDE. If yes, Please replace this val spark= SparkSession.builder().getOrCreate() with val spark= SparkSession.builder().master("local[*]").getOrCreate() in your code.

or if you are executing this code using spark-submit try add this --master yarn.

Srinivas
  • 8,957
  • 2
  • 12
  • 26