I am working on a data pipeline on Azure that in the end produces a file on a Blob Container. This file needs to be then uploaded on a SFTP on a given schedule. I am using Azure Data Factory for the pipeline scheduling, but ADF doesn't yet support SFTP as a sink. The following code is an attempt to use Azure Functions to solve this part of the pipeline. This Azure Function would then be called from within ADF, which is why I am not using a Blob Trigger but an HTTP Trigger. It is mostly inspired by https://github.com/GinSiuCheng/azure-sample-function-sftp-upload/blob/master/azure-sample-function-sftp-upload/sftpUpload.cs
I came up with the following code, and am struggling to understand the Stream aspect. The code produces a file on the SFTP but it is empty.
namespace Company.Function
{
public static class HttpTriggerSFTP
{
private static CloudBlockBlob cloudBlockBlobToTransfer;
private static System.IO.Stream fileStream ;
[FunctionName("HttpTriggerSFTP")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string name = req.Query["name"];
// Storage Account Parameters
string storageAccountKey = req.Headers["storageAccountKey"];
string storageAccountContainer = req.Headers["storageAccountContainer"];
string storageAccountPath = req.Headers["storageAccountPath"];
string sourceFileName = req.Headers["sourceFileName"];
string storageConnectionString = req.Headers["storageConnectionString"];
string sourceFileAbsolutePath = storageAccountPath + '/' + sourceFileName;
log.LogInformation(sourceFileAbsolutePath);
// SFTP Parameters
string sftpAddress = req.Headers["sftpAddress"];
string sftpPort = req.Headers["sftpPort"];
string sftpUsername = req.Headers["sftpUsername"];
string sftpPassword = req.Headers["sftpPassword"];
string sftpPath = req.Headers["sftpPath"];
string targetFileName = req.Headers["targetFileName"];
var memoryStream = new MemoryStream();
CloudStorageAccount storageAccount;
if (CloudStorageAccount.TryParse(storageConnectionString, out storageAccount))
{
// If the connection string is valid, proceed with operations against Blob storage here.
CloudBlobClient cloudBlobClient = storageAccount.CreateCloudBlobClient();
// Create a container called 'quickstartblobs' and append a GUID value to it to make the name unique.
CloudBlobContainer cloudBlobContainer = cloudBlobClient.GetContainerReference(storageAccountContainer);
log.LogInformation("looking for file",sourceFileAbsolutePath);
cloudBlockBlobToTransfer = cloudBlobContainer.GetBlockBlobReference(sourceFileAbsolutePath);
await cloudBlockBlobToTransfer.DownloadToStreamAsync(memoryStream);
log.LogInformation("Blob connection Ok");
}
var methods = new List<AuthenticationMethod>();
methods.Add(new PasswordAuthenticationMethod(sftpUsername, sftpPassword));
// Connect to SFTP Server and Upload file
Renci.SshNet.ConnectionInfo con = new Renci.SshNet.ConnectionInfo(sftpAddress, sftpPort, new PasswordAuthenticationMethod(sftpUsername, sftpPassword));
using (var client = new SftpClient(con) )
{
client.Connect();
log.LogInformation("About to upload to sftp");
client.UploadFile(memoryStream, $"/{sftpPath}/{targetFileName}");
var files = client.ListDirectory(sftpPath);
foreach (var file in files)
{
log.LogInformation(file.Name);
}
client.Disconnect();
}
}
}
}
I'd like to understand why my stream is empty ? How can I ensure that I am loading data in it ?
I am also planning on using Key Vault to store credentials instead of putting it in headers.
Thanks for your time.