0

How to read a CSV file from SFTP and use CSVHelper to parse the content without saving CSV locally?

Is this possible, or do we have to save it locally, parse and delete the file?

I am using SSH.Net and CSVHelper.

  • You will need to use a memory stream. Then fill memory stream from SFTP. I would write my own code to read the csv rather then use CSVHelper. – jdweng May 13 '19 at 19:17
  • Is there any downside of using memory stream rather than saving the file local then open it? :) –  May 13 '19 at 19:18
  • No. Memory stream will be more efficient since you are only doing one step instead of two (writing to file and then reading file). The memory stream is equivalent to Stream Writer or Stream Reader. – jdweng May 13 '19 at 19:29
  • @jdweng I want to do this method async. Would it be possible? Because there are many directory for many country, and I want each country to download file and parse the CSV parallel. –  May 13 '19 at 19:31
  • Some servers limit the number of connections from the same IP so it may not be possible. – jdweng May 13 '19 at 21:55

1 Answers1

2

It needs to rely on Stream-processing of file:

    public async Task ProcessRemoteFilesAsync()
    {
        var credentials = new Credentials("host", "username", "password");
        var filePaths = new List<string>();

        // initializing filePaths ..

        var tasks = filePaths
            .Select(f => ParseRemoteFileAsync(credentials, f))
            .ToArray();

        var results = await Task.WhenAll(tasks).ConfigureAwait(false);

        // traverse through results..
    }

    public async Task<FileContent> ParseRemoteFileAsync(Credentials credentials, string filePath)
    {
        using (var sftp = new SftpClient(credentials.host, credentials.username, credentials.password))
        {
            sftp.Connect();

            try
            {
                using (var remoteFileStream = sftp.OpenRead(filePath))
                {
                    using (var reader = new StreamReader(remoteFileStream))
                    {
                        using (var csv = new CsvReader(reader))
                        {
                            /*
                            // Example of CSV parsing:
                            var records = new List<Foo>();
                            csv.Read();
                            csv.ReadHeader();
                            while (csv.Read())
                            {
                                var record = new Foo
                                {
                                    Id = csv.GetField<int>("Id"),
                                    Name = csv.GetField("Name")
                                };
                                records.Add(record);
                            }
                            */
                        }
                    }
                }
            }
            finally {
                sftp.Disconnect();
            }
        }
    }

Modified version that uses pool of SftpClient

See C# Object Pooling Pattern implementation.

Implementation of pool borrowed from How to: Create an Object Pool by Using a ConcurrentBag:

/// <summary>
///     Implementation borrowed from [How to: Create an Object Pool by Using a
///     ConcurrentBag](https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/how-to-create-an-object-pool).
/// </summary>
/// <typeparam name="T"></typeparam>
public class ObjectPool<T> : IDisposable
    where T : IDisposable
{
    private readonly Func<T> _objectGenerator;
    private readonly ConcurrentBag<T> _objects;

    public ObjectPool(Func<T> objectGenerator)
    {
        _objectGenerator = objectGenerator ?? throw new ArgumentNullException(nameof(objectGenerator));
        _objects = new ConcurrentBag<T>();
    }

    public void Dispose()
    {
        while (_objects.TryTake(out var item))
        {
            item.Dispose();
        }
    }

    public T GetObject()
    {
        return _objects.TryTake(out var item) ? item : _objectGenerator();
    }

    public void PutObject(T item)
    {
        _objects.Add(item);
    }
}

The simplest Pool-based implementation (it doesn't care about exception processing, retry-policies):

internal class SftpclientTest
{
    private readonly ObjectPool<SftpClient> _objectPool;


    public SftpclientTest(Credentials credentials)
    {
        _objectPool = new ObjectPool<SftpClient>(() =>
        {
            var client = new SftpClient(credentials.host, credentials.username, credentials.password);
            client.Connect();

            return client;
        });
    }


    public void GetDirectoryList()
    {
        var client = _objectPool.GetObject();

        try
        {
            // client.ListDirectory() ..
        }
        finally
        {
            if (client.IsConnected)
            {
                _objectPool.PutObject(client);
            }
        }
    }


    public async Task ProcessRemoteFilesAsync()
    {
        var filePaths = new List<string>();

        // initializing filePaths ..

        var tasks = filePaths
            .Select(f => ParseRemoteFileAsync(f))
            .ToArray();

        var results = await Task.WhenAll(tasks).ConfigureAwait(false);

        // traverse through results..
    }

    public Task<FileContent> ParseRemoteFileAsync(string filePath)
    {
        var client = _objectPool.GetObject();

        try
        {
            using (var remoteFileStream = client.OpenRead(filePath))
            {
                using (var reader = new StreamReader(remoteFileStream))
                {
                    using (var csv = new CsvReader(reader))
                    {
                        // ..
                    }
                }

                return Task.FromResult(new FileContent());
            }
        }
        finally
        {
            if (client.IsConnected)
            {
                _objectPool.PutObject(client);
            }
        }
    }
}
vladimir
  • 13,428
  • 2
  • 44
  • 70
  • Is it possible for me to put the ReadAndParseCSV logic in its own function and call it inside the using csvreader? –  May 13 '19 at 19:22
  • Yes you can move it to separate method. – vladimir May 13 '19 at 19:26
  • I want to do this method async. Would it be possible? Because there are many directory for many country, and I want each country to download file and parse the CSV parallel. Would this be possible? –  May 13 '19 at 19:31
  • I added example for multi file processing, it should help you made a start *by yourself* ;) – vladimir May 13 '19 at 19:56
  • Thanks, I'm curious about one thing. I got a function called GetDirectoryListFromSFTP() which creates a connection to the same SFTP as ReadRemoteDirectory, is it fine to create this connection -> get list of directory -> close connection -> open connection in another functiion ProcessRemoteFilesAsync and prosess on all the directory? –  May 14 '19 at 08:01
  • Good question - I added to the answer the example that uses pooling of clients. It should give you the right direction to move next. – vladimir May 14 '19 at 20:41
  • Could you explain what pooling of clients does? :) –  May 15 '19 at 07:40
  • Btw, should I move the Try Catch to outside the using SFTP client? –  May 15 '19 at 08:59
  • if I use this, should I use "using" the object pool client? Or is it disposed automatically? –  May 16 '19 at 08:21
  • How can I do exception processing and retry policy in this? Like if it try to use a objectPool with the client which has expired / timedout or closed the session? –  May 16 '19 at 11:14