0

I want to save chekpoint tests in a location on amazon S3, this is the part of my scala code on DStream,using below format but getting the error..

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

Code:

val creatingFunc = { ()=>

// Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val ggsnLines = ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\GGSN\\Files1",filterF,false)
val ccnLines= ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\CCN\\Files1",filterF,false)
val probeLines= ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\Probe\\Files1",filterF,false)

val ggssnArrays=ggsnLines.map(x=>(x._1,x._2.toString())).filter(!_._2.contains("ggsnIPAddress")).map(x=>(x._1,x._2.split(",")))
ggssnArrays.foreachRDD(s=> {
  s.collect().take(10).foreach(u=>println(u._2.mkString(",")))
})

ssc.remember(Minutes(1))  // To make sure data is not deleted by the time we query it interactively


ssc.checkpoint("s3n://probecheckpoints/checkpoints")

println("Creating function called to create new StreamingContext")
newContextCreated = true
ssc
}

def main(args:Array[String]): Unit =
{

//the minremeberduration is set to read the previous files from the directory
//the kyroclasses serialization needs to be enabled for the filestream

if (stopActiveContext) {
  StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
}

// Get or create a streaming context

val hadoopConfiguration:Configuration=new Configuration()
hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIAIOPSJVBDTEUHUJCQ")
hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "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")


//val ssc = StreamingContext.getActiveOrCreate(creatingFunc)
val ssc=StreamingContext.getActiveOrCreate("s3n://probecheckpoints/SparkCheckPoints",creatingFunc,hadoopConfiguration,false)



if (newContextCreated) {
  println("New context created from currently defined creating function")
} else {
  println("Existing context running or recovered from checkpoint, may not be running currently defined creating function")
}

// Start the streaming context in the background.
ssc.start()
Alex Lipov
  • 13,503
  • 5
  • 64
  • 87
Mahdi
  • 787
  • 1
  • 8
  • 33
  • Possible duplicate of [Spark Streaming checkpoint to amazon s3](http://stackoverflow.com/questions/33475931/spark-streaming-checkpoint-to-amazon-s3) – Yuval Itzchakov Aug 02 '16 at 06:05
  • Just edited my original question based on solution on above thread but still getting the error. – Mahdi Aug 02 '16 at 10:24
  • 1
    Can you try "fs.s3n.awsAccessKeyId" and "fs.s3n.awsSecretAccessKey" instead of fs.s3 ? – Knight71 Aug 02 '16 at 11:40
  • Hi, Thanks but I get another error instead, [main] WARN - Your hostname, AUDDT1300014W resolves to a loopback/non-reachable address: fe80:0:0:0:f80f:7c7d:afe0:41f4%21, but we couldn't find any external IP address! Exception in thread "main" org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/SparkCheckPoints' - ResponseCode=403, ResponseMessage=Forbidden – Mahdi Aug 02 '16 at 22:51

0 Answers0