3

Unable to find a sparklyr built in for listing the contents of a directory via Spark, I am attempting to use invoke:

sc <- spark_connect(master = "yarn", config=config)
path <- 'gs:// ***path to bucket on google cloud*** '
spath <- sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', path) 
fs <- sparklyr::invoke(spath, 'getFileSystem')
list <- sparklyr:: invoke(fs, 'listLocatedStatus') 
Error: java.lang.Exception: No matched method found for class org.apache.hadoop.fs.Path.getFileSystem
    at sparklyr.Invoke.invoke(invoke.scala:134)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66) ...

Note: Are there guidelines for reproducible examples with distributed code? I don't know how to make an example others could follow, given I am running against a particular Spark environment.

zero323
  • 322,348
  • 103
  • 959
  • 935
justin cress
  • 1,745
  • 5
  • 24
  • 35
  • _Are there guidelines..._ - [How to make good reproducible Apache Spark Dataframe examples](https://stackoverflow.com/q/48427185/6910411) – zero323 Oct 22 '18 at 15:06

1 Answers1

4

getFileSystem method takes org.apache.hadoop.conf.Configuration object as the first argument:

public FileSystem getFileSystem(Configuration conf)
                     throws IOException

Return the FileSystem that owns this Path.

Parameters:

conf - the configuration to use when resolving the FileSystem

So the code to retrieve FileSystem instance should looks more or less like this:

# Retrieve Spark's Hadoop configuration
hconf <- sc %>% spark_context() %>% invoke("hadoopConfiguration")
fs <- sparklyr::invoke(spath, 'getFileSystem', hconf)

Additionally listLocatedStatus takes either Path

public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                     throws FileNotFoundException,
                                                                            IOException

or Path and PathFilter (note that this implementation is protected):

public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                    throws FileNotFoundException,
                                                                            IOException

So if you want to structure your code as shown above you'll have to provide at least a path

sparklyr:: invoke(fs, "listLocatedStatus", spath)

In practice it might be easier to just get FileSystem directly:

fs <- invoke_static(sc, "org.apache.hadoop.fs.FileSystem", "get",  hconf)

and use globStatus

lls <- invoke(fs, "globStatus", spath)

where spath is a path with wildcard, like:

sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', "/some/path/*")

The result will be an R list, which can be easily iterated:

lls  %>%
    purrr::map(function(x) invoke(x, "getPath") %>% invoke("toString"))

Credits:

The answer to How can one list all csv files in an HDFS location within the Spark Scala shell? by @jaime

Notes:

  • In general, if you interact with non-trivial Java API, it makes much more sense to write your code in Java or Scala, and provide a minimal R interface.
  • For interactions with specific file object store it might be easier to use a dedicated package. For Google Cloud Storage you can take a look at googleCloudStorageR.
zero323
  • 322,348
  • 103
  • 959
  • 935
  • How do I get the contents out of flist ? > flist <- sparklyr::invoke(fs, "listLocatedStatus", spath) > flist org.apache.hadoop.fs.FileSystem$4 org.apache.hadoop.fs.FileSystem$4@71ab657 – justin cress Oct 22 '18 at 15:06
  • This is good advice, but, I don't currently know/work in either Java or Scala. Is there a better recommended way for a sparklyr user to list the contents of directories? I don't find this functionality built-in to sparklyr. – justin cress Oct 22 '18 at 15:28
  • As far as I know there isn't built-in helper for that. And about your first point - if you don't know Scala or Java, then working with `invoke` directly will be a nightmare. Using JVM language directly is a much better choice - decent IDE can help you will majority of the boilerplate and help you to determine and troubleshoot the errors. – zero323 Oct 22 '18 at 15:34
  • All this works until I get the resutl of listLocatedStatus. It's an Environment in R, not an iterable. It has class attributes 'spark_jobj' 'shell_jobj' – justin cress Oct 22 '18 at 15:43
  • Only the basic objects (array, primitives, boxed primitives) are converted to equivalent R object. The rest is provided as-is - as handles to java objects. Additionally it looks like `sparklyr` does something unexpected under the hood, trying to access non-public member, though I don't see why it could be the case. – zero323 Oct 22 '18 at 15:52