4

I have a use case where I need to call RESTAPI from spark streaming after messages are read from Kafka to perform some calculation and save back the result to HDFS and third party application.

I have few doubts here:

  • How can we call RESTAPI directly from the spark streaming.
  • How to manage RESTAPI timeout with streaming batch time.
Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
nilesh1212
  • 1,561
  • 2
  • 26
  • 60

1 Answers1

9

This code will not compile as it is. But this the approach for the given usecase.

val conf = new SparkConf().setAppName("App name").setMaster("yarn")
val ssc = new StreamingContext(conf, Seconds(1))

val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

dstream.foreachRDD { rdd =>

  //Write the rdd to HDFS directly
  rdd.saveAsTextFile("hdfs/location/to/save")

  //loop through each parttion in rdd
  rdd.foreachPartition { partitionOfRecords =>

    //1. Create HttpClient object here
    //2.a POST data to API

    //Use it if you want record level control in rdd or partion
    partitionOfRecords.foreach { record =>
      //2.b Post the the date to API
      record.toString
    }
  }
  //Use 2.a or 2.b to POST data as per your req
}

ssc.start()
ssc.awaitTermination()

Most of the HttpClients (for REST call) supports request timeout.

Sample Http POST call with timeout using Apache HttpClient

val CONNECTION_TIMEOUT_MS = 20000; // Timeout in millis (20 sec).

val requestConfig = RequestConfig.custom()
  .setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
  .setConnectTimeout(CONNECTION_TIMEOUT_MS)
  .setSocketTimeout(CONNECTION_TIMEOUT_MS)
  .build();

HttpClientBuilder.create().build();

val client: CloseableHttpClient = HttpClientBuilder.create().build();

val url = "https://selfsolve.apple.com/wcResults.do"
val post = new HttpPost(url);

//Set config to post
post.setConfig(requestConfig)

post.setEntity(EntityBuilder.create.setText("some text to post to API").build())

val response: HttpResponse = client.execute(post)
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • the catch now is cluster is kerberos enabled, so when I curl restapi URL on any of the nodes I receive expected output,but the same URL fail in spark program with HTTP 503/504 error. – nilesh1212 Jan 31 '17 at 14:15
  • I'm really glad, that helped you. – mrsrinivas Feb 03 '17 at 07:33
  • Hi Nilesh, I am also trying to execute the similar case, however that will be using Spark Java. In your case just wanted to understand where and you are calling the HttpRest call in the Spark Streaming application. Also, does Spark provide any class like a Receiver class where you will be receiving the event streams from a URL. In my case I want to post the events to an URL. – Avinash May 30 '19 at 12:45
  • @Avinash Hey Avinash, I was calling the RestAPI on the foreachPartition to execute RestAPI in distributed fashion, as this is streaming application please make sure ur HTTP timeout is < bacth interval else you will have lot of backlog messages to process – nilesh1212 Dec 07 '20 at 06:14