-1

I have a use case which I am trying to solve using Spark. The use case is that I have to call an API which expects a batchSize and token and then it gives back the token for next page. It gives me a list of JSON objects. Now I have to call this API till all the results are returned and write them all to s3 in parquet format. The size of returned object can range from 0 to 100 million.

My approach is that I am first getting let's say a batch of 1 million object, I convert them into a dataset and then writing to parquet using

dataSet.repartition(1).write.mode(SaveMode.Append)
      .option("mapreduce.fileoutputcommitter.algorithm.version", "2")
      .parquet(s"s3a://somepath/")

and then repeat the process till my API says that there's no more data, i.e. token is null

So the process is that those API calls will have to be run on the driver and sequentially. And once I get a million I will write to s3.

I have been seeing these memory issues on driver.

Application application_1580165903122_19411 failed 1 times due to AM Container for appattempt_1580165903122_19411_000001 exited with exitCode: -104
Diagnostics: Container [pid=28727,containerID=container_1580165903122_19411_01_000001] is running beyond physical memory limits. Current usage: 6.6 GB of 6.6 GB physical memory used; 16.5 GB of 13.9 GB virtual memory used. Killing container.
Dump of the process-tree for container_1580165903122_19411_01_000001 :

I have seen some weird behavior in a sense that, sometimes 30 million works fine and sometimes it fail due to this. Even 1 million fails sometimes.

I am wondering if I am doing some very silly mistake or is there any better approach for this?

KingJames
  • 546
  • 2
  • 9
  • 20

1 Answers1

0

This design is not scalable and putting a lot of pressure on the driver, so it is expected for it to crash. Additionally a lot of data is accumulated in memory before writing to s3.

I will recommend you to use Spark streaming to read data from API.In this way many executors will do the work and the solution will be much scalable. Here is an example - RestAPI service call from Spark Streaming

In those executors you can accumulate the API response in a balanced way, say accumulate 20,000 records but not wait for 5M records. After say 20,000 write them to S3 in "append" mode. The "append" mode will help multiple process work in tandem and not step on each other.

Salim
  • 2,046
  • 12
  • 13
  • thanks for the suggestion. In my case the 2nd call to REST API depends on the response from the first. How would spark streaming comes to picture? since this implementation would want multiple executors to get different data based on the API tokens – KingJames Jan 31 '20 at 23:01
  • can you make 2nd call immediately after 1st response? If so then make 2 calls and then write both responses to s3. If you cannot make 2nd call immediately after 1st then write 2 process - one makes 1st call and writes the response, another reads response from s3 and makes 2nd call – Salim Feb 01 '20 at 13:30