0

i have a spark streaming (2.1.1 with cloudera 5.12). with input kafka and output HDFS (in parquet format) the problem is , i'm getting LeaseExpiredException randomly (not in all mini-batch)

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/qoe_fixe/data_tv/tmp/cleanData/_temporary/0/_temporary/attempt_20180629132202_0215_m_000000_0/year=2018/month=6/day=29/hour=11/source=LYO2/part-00000-c6f21a40-4088-4d97-ae0c-24fa463550ab.snappy.parquet (inode 135532024): File does not exist. Holder DFSClient_attempt_20180629132202_0215_m_000000_0_-1048963677_900 does not have any open files.

i'm using the dataset API for writing to hdfs

      if (!InputWithDatePartition.rdd.isEmpty() ) InputWithDatePartition.repartition(1).write.partitionBy("year", "month", "day","hour","source").mode("append").parquet(cleanPath)

my job fails after few hours because of this error

  • Are you sure that no other job is trying to update/delete the path `"cleanPath"`? – pushpavanthar Jul 03 '18 at 17:34
  • i had two job streaming writing to this folder , but i added "source" as partition so they will write to different partition (folder) . when i changer the cleanpath (the parent folder) to be different for those two job , i didn't face this problem – Yassine Jouini Jul 06 '18 at 10:07
  • the directory path is of temp location. Can you try giving some concrete path and see if this problem exists? – pushpavanthar Jul 09 '18 at 05:15

1 Answers1

0

Two jobs write to the same directory share the same _temporary folder.

So when the first job finishes this code is executed (FileOutputCommitter class):

  public void cleanupJob(JobContext context) throws IOException {
    if (hasOutputPath()) {
      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
      FileSystem fs = pendingJobAttemptsPath
          .getFileSystem(context.getConfiguration());
      // if job allow repeatable commit and pendingJobAttemptsPath could be
      // deleted by previous AM, we should tolerate FileNotFoundException in
      // this case.
      try {
        fs.delete(pendingJobAttemptsPath, true);
      } catch (FileNotFoundException e) {
        if (!isCommitJobRepeatable(context)) {
          throw e;
        }
      }
    } else {
      LOG.warn("Output Path is null in cleanupJob()");
    }
  }

it deletes pendingJobAttemptsPath(_temporary) while the second job is still running This may be helpful:

Multiple spark jobs appending parquet data to same base path with partitioning

Alex
  • 111
  • 10