5

I am running Pyspark scripts to write a dataframe to a csv in jupyter Notebook as below:

df.coalesce(1).write.csv('Data1.csv',header = 'true')

After an hour of runtime I am getting the below error.

Error: Invalid status code from http://.....session isn't active.

My config is like:

spark.conf.set("spark.dynamicAllocation.enabled","true")
spark.conf.set("shuffle.service.enabled","true")
spark.conf.set("spark.dynamicAllocation.minExecutors",6)
spark.conf.set("spark.executor.heartbeatInterval","3600s")
spark.conf.set("spark.cores.max", "4")
spark.conf.set("spark.sql.tungsten.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.app.id", "Logs")
spark.conf.set("spark.io.compression.codec", "snappy")
spark.conf.set("spark.rdd.compress", "true")
spark.conf.set("spark.executor.instances", "6")
spark.conf.set("spark.executor.memory", '20g')
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.driver.allowMultipleContexts", "true")
spark.conf.set("spark.master", "yarn")
spark.conf.set("spark.driver.memory", "20G")
spark.conf.set("spark.executor.instances", "32")
spark.conf.set("spark.executor.memory", "32G")
spark.conf.set("spark.driver.maxResultSize", "40G")
spark.conf.set("spark.executor.cores", "5")

I have checked the container nodes and the error there is:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed:container_e836_1556653519610_3661867_01_000005 on host: ylpd1205.kmdc.att.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143

Not able to figure out the issue.

help-info.de
  • 6,695
  • 16
  • 39
  • 41
  • not sure on pyspark but header= true should not be in single quotes it should be something like df.coalesce(1).write.option ("header","true").csv('Data1.csv') – Aaron Jun 20 '19 at 19:08
  • That's not an issue. I am able to fetch data using this. –  Jun 20 '19 at 19:23
  • how big is your DF? coalesce(1) is not something that should be used unless you have enough resources. what i would suggest if you are using hdfs. write out your header file and write out the csv file in partition manner then use hdfs copymerge to get single file. – Aaron Jun 20 '19 at 19:30
  • My DF is large. I understood where the problem is and you are correct that I need to write this in partitions. can you share some sample Scripts on how to write out the files in partition manner? –  Jun 21 '19 at 11:27
  • do you still want to have one CSV file at the end ? – Aaron Jun 21 '19 at 12:49
  • Yes i want a single csv –  Jun 21 '19 at 17:33
  • Possible duplicate of [java.lang.OutOfMemoryError: Unable to acquire 100 bytes of memory, got 0](https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0) – user10938362 Jun 21 '19 at 19:54
  • @Alan does the answer suffice your question? let me know. – Aaron Jun 25 '19 at 16:23

2 Answers2

10

Judging by the output, if your application is not finishing with a FAILED status, that sounds like a Livy timeout error: your application is likely taking longer than the defined timeout for a Livy session (which defaults to 1h), so even despite the Spark app succeeds your notebook will receive this error if the app takes longer than the Livy session's timeout.

If that's the case, here's how to address it:

  1. edit the /etc/livy/conf/livy.conf file (in the cluster's master node)
  2. set the livy.server.session.timeout to a higher value, like 8h (or larger, depending on your app)
  3. restart Livy to update the setting: sudo restart livy-server in the cluster's master
  4. test your code again
jmng
  • 2,479
  • 1
  • 25
  • 38
-1

I am not well versed in pyspark but in scala the solution would involve something like this

First we need to create a method for creating a header file:

def createHeaderFile(headerFilePath: String, colNames: Array[String]) {

//format header file path
val fileName = "dfheader.csv"
val headerFileFullName = "%s/%s".format(headerFilePath, fileName)

//write file to hdfs one line after another
val hadoopConfig = new Configuration()
val fileSystem = FileSystem.get(hadoopConfig)
val output = fileSystem.create(new Path(headerFileFullName))
val writer = new PrintWriter(output)

for (h <- colNames) {
  writer.write(h + ",")
}
writer.write("\n")
writer.close()

}

You will also need a method for calling hadoop to merge your part files which would be written by df.write method:

def mergeOutputFiles(sourcePaths: String, destLocation: String): Unit = {

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
// in case of array[String] use   for loop to iterate over the muliple source paths  if not use the code below 
//   for (sourcePath <- sourcePaths) {
  //Get the path under destination where the partitioned files are temporarily stored
  val pathText = sourcePaths.split("/")
  val destPath = "%s/%s".format(destLocation, pathText.last)

  //Merge files into 1
  FileUtil.copyMerge(hdfs, new Path(sourcePath), hdfs, new Path(destPath), true, hadoopConfig, null)
 // }
//delete the temp partitioned files post merge complete
val tempfilesPath = "%s%s".format(destLocation, tempOutputFolder)
hdfs.delete(new Path(tempfilesPath), true)
}

Here is a method for generating output files or your df.write method where you are passing your huge DF to be written out to hadoop HDFS:

def generateOutputFiles( processedDf: DataFrame, opPath: String, tempOutputFolder: String,
                       spark: SparkSession): String = {

  import spark.implicits._

  val fileName = "%s%sNameofyourCsvFile.csv".format(opPath, tempOutputFolder)
  //write as csv to output directory and add file path to array to be sent for merging and create header file
  processedDf.write.mode("overwrite").csv(fileName)

  createHeaderFile(fileName, processedDf.columns)
  //create an array of the partitioned file paths

  outputFilePathList = fileName

  // you can use array of string or string only depending on  if the output needs to get divided in multiple file based on some parameter  in that case chagne the signature ot Array[String] as output
  // add below code 
  // outputFilePathList(counter) = fileName
  // just use a loop in the above  and increment it 
  //counter += 1

  return outputFilePathList
}

With all the methods defined here is how you can implement them:

def processyourlogic( your parameters  if any):Dataframe=
{
// your logic to do whatever needs to be done to your data
}

Assuming the above method returns a dataframe, here is how you can put everything together:

val yourbigD f = processyourlogic(your parameters) // returns DF
yourbigDf.cache // caching just in case you need it 
val outputPathFinal = " location where you want your file to be saved"
val tempOutputFolderLocation = "temp/"
val partFiles = generateOutputFiles(yourbigDf, outputPathFinal, tempOutputFolderLocation, spark)
mergeOutputFiles(partFiles, outputPathFinal)

Let me know if you have any other question relating to that. If the answer you seek is different then the original question should be asked.

Aleksey Potapov
  • 3,683
  • 5
  • 42
  • 65
Aaron
  • 662
  • 8
  • 20