I'm migrating spark job from private data center to GCP. Earlier I was using FileSystem from hadoop.fs to transfer and delete file within maprfs. But with GCP bucket, this is not working. I have to transfer files from directory A to directory B within the same GCP bucket and then delete files from directory A after transfer. The below two methods don't work in GCP. Can you please suggest any other libraries in scala that support transfer and delete of files in google cloud storage bucket.
Delete method:
import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileSystem, FileUtil}
def deleteFiles(sc: SparkContext, reingestHdfsFiles: String): Unit = {
val files = hdfsFiles.split(FILE_SEPERATOR)
var fs: FileSystem = null
try {
fs = path.getFileSystem(sc.hadoopConfiguration)
for (file <- files) {
val p = new Path(file)
if (fs.exists(p)) {
fs.delete(p, true)
println("Deleted: " + file)
} else {
println("Error, Unable to delete:" + file)
}
}
} catch {
case e: Exception => println("Exception while deleting files");
} finally {
if (fs != null) fs.close()
}}
File transfer method:
import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileSystem, FileUtil}
def moveFiles(sc: SparkContext, hdfsFiles: String, toLoc:String): Unit = {
val files = hdfsFiles.split(FILE_SEPERATOR)
var fs: FileSystem = null
var fu: FileUtil = null
try {
fs = path.getFileSystem(sc.hadoopConfiguration)
if(!fs.exists(new Path(toLoc))) fs.mkdirs(new Path(toLoc))
for (file <- files) {
val p = new Path(file)
if (fs.exists(p)) {
//println("Move successful: " + fs.rename(p, new Path(toLoc + file.substring(file.lastIndexOf('/') + 1)+ '/')))
FileUtil.copy(srcFS, src, dsdtFS, dst, false, sc.hadoopConfiguration)
} else {
println("Error, File does not exist:" + file)
}
}
} catch {
case e: Exception => println("Exception while moving files");
} finally {
if (fs != null) fs.close()
}}