4

when I run all these 3 commands in unix shell/terminal, they all work fine, returning the exit status as 0

unix_shell> ls -la
unix_shell> hadoop fs -ls /user/hadoop/temp
unix_shell> s3-dist-cp --src ./abc.txt --dest s3://bucket/folder/

now I am trying to run these same commands through scala process api as an external process, the sample code is like below:

import scala.sys.process._

val cmd_1 = "ls -la"
val cmd_2 = "hadoop fs -ls /user/hadoop/temp/"
val cmd_3 = "/usr/bin/s3-dist-cp --src /tmp/sample.txt --dest s3://bucket/folder/"
val cmd_4 = "s3-dist-cp --src /tmp/sample.txt --dest s3://bucket/folder/"

val exitCode_1 = (stringToProcess(cmd_1)).! // works fine and produces result
val exitCode_2 = (stringToProcess(cmd_2)).! // works fine and produces result
val exitCode_3 = (stringToProcess(cmd_3)).! // **it just hangs, yielding nothing**
val exitCode_4 = (stringToProcess(cmd_4)).! // **it just hangs, yielding nothing**

The difference between above cmd_3 and cmd_4 is just the absolute path. And I am passing the relevant dependency explicitly in spark-submit script like below

--jars hdfs:///user/hadoop/s3-dist-cp.jar

Your inputs/suggestion would be helpful. Thanks !

user3190018
  • 890
  • 13
  • 26
TheCodeCache
  • 820
  • 1
  • 7
  • 27
  • 1
    are you using cluster mode or client mode? if cluster are you copying that the workers ? in that case first try in client mode if it works then try cluster mode. – Ram Ghadiyaram Jun 01 '19 at 06:38
  • 1
    I am running in client mode only, and I am only intended to run the s3-dist-cp command on spark driver node itself, – TheCodeCache Jun 01 '19 at 08:10
  • any idea on which all hadoop application jars, I need to include to --jars options, i tried using hdfs:///user/hadoop/s3-dist-cp.jar but couldn't work, is this jar enough or i need to include some more jars to the driver class path – TheCodeCache Jun 01 '19 at 08:11

2 Answers2

2

Actually scala process is working outside the spark context, so in order to run that s3-dist-cp command successfully all I did was to stop the spark context before launching the scala process which wraps s3-dist-cp command, the full working code is as follows:

    logger.info("Moving ORC files from HDFS to S3 !!")

    import scala.sys.process._

    logger.info("stopping spark context..##")
    val spark = IngestionContext.sparkSession
    spark.stop()
    logger.info("spark context stopped..##")
    logger.info("sleeping for 10 secs")
    Thread.sleep(10000) // this sleep is not required, this was just for debugging purpose, you can remove this in your final code.
    logger.info("woke up after sleeping for 10 secs")

    try {
      /**
       * following is the java version, off course you need take care of few imports
       */
      //val pb = new java.lang.ProcessBuilder("s3-dist-cp", "--src", INGESTED_ORC_DIR, "--dest", "s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp", "--srcPattern", ".*\\.orc")
      //val pb = new java.lang.ProcessBuilder("hadoop", "jar", "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar", "--src", INGESTED_ORC_DIR, "--dest", "s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp", "--srcPattern", ".*\\.orc")
      //pb.directory(new File("/tmp"))
      //pb.inheritIO()
      //pb.redirectErrorStream(true)
      //val process = pb.start()
      //val is = process.getInputStream()
      //val isr = new InputStreamReader(is)
      //val br = new BufferedReader(isr)
      //var line = ""
      //logger.info("printling lines:")
      //while (line != null) {
      //  line = br.readLine()
      //  logger.info("line=[{}]", line)
      //}

      //logger.info("process goes into waiting state")
      //logger.info("Waited for: " + process.waitFor())
      //logger.info("Program terminated!")

      /**
       * following is the scala version
       */
      val S3_DIST_CP = "s3-dist-cp"
      val INGESTED_ORC_DIR = S3Util.getSaveOrcPath()

      // listing out all the files
      //val s3DistCpCmd = S3_DIST_CP + " --src " + INGESTED_ORC_DIR + " --dest " + paramMap(Storage_Output_Path).substring(4) + "_temp --srcPattern .*\\.orc"
      //-Dmapred.child.java.opts=-Xmx1024m -Dmapreduce.job.reduces=2
      val cmd = S3_DIST_CP + " --src " + INGESTED_ORC_DIR + " --dest " + "s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp --srcPattern .*\\.orc"

      //val cmd = "hdfs dfs -cp -f " + INGESTED_ORC_DIR + "/* " + "s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp/"
      //val cmd = "hadoop distcp " + INGESTED_ORC_DIR + "/ s3:/" + paramMap(Storage_Output_Path).substring(4) + "_temp_2/"

      logger.info("full hdfs to s3 command : [{}]", cmd)

      // command execution
      val exitCode = (stringToProcess(cmd)).!

      logger.info("s3_dist_cp command exit code: {} and s3 copy got " + (if (exitCode == 0) "SUCCEEDED" else "FAILED"), exitCode)
    } catch {
      case ex: Exception =>
        logger.error(
          "there was an exception while copying orc file to s3 bucket. {} {}",
          "", ex.getMessage, ex)
        throw new IngestionException("s3 dist cp command failure", null, Some(StatusEnum.S3_DIST_CP_CMD_FAILED))
    }

Though the above code works exactly as expected but have other observation too as follows which is weired:

Instead of using this

val exitCode = (stringToProcess(cmd)).!

if you use this

val exitCode = (stringToProcess(cmd)).!!

notice the difference in single ! and double !!, as single ! only returns the exit code whereas the double !! returns the output of the process execution

so in case of single ! the above code just works fine, and in case of double !!, it works too but the it generates too many files and copies in S3 bucket as opposed to the number of original files.

And as for spark-submit command, no need to worry about the --driver-class-path or even --jars option as I did not pass any dependency.

TheCodeCache
  • 820
  • 1
  • 7
  • 27
  • Thanks @Ram Ghadiyaram a lot, though I tried your solution but didn't work, but if you had not given me a slight push by sharing that link, I may not have strived further and arrived to a solution, thanks a ton, keep helping : ) – TheCodeCache Jun 02 '19 at 08:01
  • 1
    good effort buddy! :-) +1... one thing is you have not shared the code to point out what exactly went wrong.. I would not even know that you are not stopping spark context . next time share the code snippet and versions of the software to get an elegant/elite answers. All the best! but in general my answer is still valid. i did lot of research for you and posted answer there. – Ram Ghadiyaram Jun 03 '19 at 21:10
  • oh, Appreciate for your effort. it was really helpful, and yes will take care of sharing optimal code piece to have more insight going forward, actually I am still new to SO and learning these things. thanks !! – TheCodeCache Jun 04 '19 at 11:11
1

Seems like what you have done is right.see here https://github.com/gorros/spark-scala-tips/blob/master/README.md

import scala.sys.process._

def s3distCp(src: String, dest: String): Unit = {
    s"s3-dist-cp --src $src --dest $dest".!
}

Please check this note... I wonder whether this is the case with you.

regarding your --jars /usr/lib/hadoop/client/*.jar

you can append jars related to s3-dist-cp using tr command like this. see my answer

--jars $(echo /dir_of_jars/*.jar | tr ' ' ',')

Note: To be able to use this method, you need Hadoop application to be added and you need to run Spark in client or local mode since s3-dist-cp is not available on slave nodes. If you want to run in cluster mode, then copy s3-dist-cp command to slaves during bootstrap.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Thanks Ram for the reply, yes I am using spark in client mode only, and I am intended to run this command only on spark driver node itself in client mode. "you need Hadoop application to be added" - does that mean i need to include hadoop-aws.jar or hadoop-client. jar, or any idea which all set of jars I need to inclide precisely, however I am experimenting on this solution, if I get any breakthrough, will let you know, – TheCodeCache Jun 01 '19 at 08:09
  • --jars /usr/lib/hadoop/client/*.jar,/usr/lib/hadoop/*.jar i am providing all the jars and these jars are appearning in the driver class path yet the s3-dist-cp hangs, not sure what else needs to be done, when I run spark-submit command directly from the machine still it hangs after completing map job 100%, reducing is not starting it seems – TheCodeCache Jun 01 '19 at 10:03