3

Today I began using the Dask and Paramiko packages, partly as a learning exercise, and partly because I'm beginning a project that will require dealing with large datasets (10s of GB) that must be accessed from a remote VM only (i.e. cannot store locally).

The following piece of code belongs to a short, helper program that will make a dask dataframe of a large csv file hosted on the VM. I want to later pass its output (reference to the dask dataframe) to a second function that will perform some overview analysis on it.

import dask.dataframe as dd
import paramiko as pm
import pandas as pd
import sys

def remote_file_to_dask_dataframe(remote_path):

   if isinstance(remote_path, (str)):
      try:
         client = pm.SSHClient()
         client.load_system_host_keys()
         client.connect('#myserver', username='my_username', password='my_password')
         sftp_client = client.open_sftp()
         remote_file = sftp_client.open(remote_path)
         df = dd.read_csv(remote_file)
         remote_file.close()
         sftp_client.close()
         return df 
      except:
         print("An error occurred.")
         sftp_client.close()
         remote_file.close()
   else:
      raise ValueError("Path to remote file as string required")

The code is neither nice nor complete, and I will replace username and password with ssh keys in time, but this is not the issue. In a jupyter notebook, I've previously opened the sftp connection with a path to a file on the server, and read it into a dataframe with a regular Pandas read_csv call. However, here the equivalent line, using Dask, is the source of the problem:df = dd.read_csv(remote_file).

I've looked at the documentation online (here), but I can't tell whether what I'm trying above is possible. It seems that for networked options, Dask wants a url. The parameter passing options for, e.g. S3, appear to depend on that infrastructure's backend. I unfortunately cannot make any sense of the dash-ssh documentation (here).

I've poked around with print statements and the only line that fails to execute is the one stated. The error risen is: raise TypeError('url type not understood: %s' % urlpath) TypeError: url type not understood:

Can anybody point me in the right direction for achieving what I'm trying to do? I'd expected Dask's read_csv to function as Pandas' had, as it's based on the same.

I'd appreciate any help, thanks.

p.s. I'm aware of Pandas' read_csv chunksize option, but I would like to achieve this through Dask, if possible.

digital_hen
  • 91
  • 1
  • 6

2 Answers2

2

In the master version of Dask, file-system operations are now using fsspec which, along with the previous implementations (s3, gcs, hdfs) now supports some additional file-systems, see the mapping to protocol identifiers fsspec.registry.known_implementations.

In short, using a url like "sftp://user:pw@host:port/path" should now work for you, if you install fsspec and Dask from master.

mdurant
  • 27,272
  • 5
  • 45
  • 74
  • Consider posting a similar answer to [Passing a Paramiko connection SFTPFile as input to a dask.dataframe.read_parquet](https://stackoverflow.com/q/56735362/850848) as well. – Martin Prikryl Jul 24 '19 at 16:49
0

It seems that you would have to implement their "file system" interface.

I'm not sure what is minimal set of methods that you need to implement to allow read_csv. But you definitely have to implement the open.

class SftpFileSystem(object):
    def open(self, path, mode='rb', **kwargs):
        return sftp_client.open(path, mode)

dask.bytes.core._filesystems['sftp'] = SftpFileSystem

df = dd.read_csv('sftp://remote/path/file.csv')
Martin Prikryl
  • 188,800
  • 56
  • 490
  • 992
  • 1
    There is exactly such a filesystem [here](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.sftp.SFTPFileSystem). I am planning to swap out dask's filesystem code for fsspec in the near future. – mdurant Jun 17 '19 at 13:02
  • Thanks @Martin; I'm still working on this. @mdurant, fsspec has handily allowed me to condense the first five lines of my try block to `fs = fsspec.filesystem('sftp', host=host, username=username, password=passwd)` however the exact problem persists: Pandas' `read_csv` will read the csv from remote, whereas Dask's will not: `df = dd.read_csv(fs.open(file_in_dir))`. Error, as before: _url type not understood: – digital_hen Jun 17 '19 at 14:36
  • `dd.read_csv` takes URL, not a file-like object. I believe that fsspec only saves you from implementing the `SftpFileSystem`, but you still need to do the rest what my answer shows (adding the file system to `_filesystems` and using URL) - And additionally you will need to specify the credentials and hostname in the URL. – Martin Prikryl Jun 17 '19 at 14:41
  • Indeed - dask's bytes code needs to be rewritten to use fsspec's registry of filesystems and functions, which is the work I refer to as "soon". Help appreciated! – mdurant Jun 17 '19 at 14:46
  • Thank you both. Unfortunately I'm too much of a novice to implement what's needed. I'll just work on the server. Hopefully somebody else will find your direction useful. – digital_hen Jun 18 '19 at 22:44
  • 1
    @mdurant It seems that you have done that already, so maybe you could post your own answer now. – Martin Prikryl Jul 24 '19 at 06:17