1

I was recently asked a question in an interview and it really got me thinking.

I am trying to understand and learn more about multithreading, parallelism and concurrency, and performance.

The scenario is that you have a list of file paths. Files are saved on your HDD or on blob storage. You have read the files and store them in a database. How would you do it in the most optimal manner?

The following are some of the ways that I could think of:

The simplest way is to loop through the list and perform this task sequentially.

Foreach(var filePath in filePaths)
{
  ProcessFile(filePath);
}

public void ProcessFile(string filePath)
{
  var file = readFile(filePath);
  storeInDb(file);
}

2nd way I could think of is creating multiple threads perhaps:

Foreach(var filePath in filePaths)
{
Thread t  = new Thread(ProcessFIle(filePath));
t.Start();
}

(not sure if the above code is correct.)

3rd way is using async await

List<Tasks> listOfTasks;
Foreach(var filePath in filePaths)
{
  var task = ProcessFile(filePath);
  listOfTasks.Add(task);
}
Task.WhenAll(listOftasks);

public async void ProcessFile(string filePath)
{
  var file = readFile(filePath);
  storeInDb(file);
}

4th way is Parallel.For:

Parallel.For(0,filePaths.Count , new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
    {
        ProcessFile(filePaths[i]);
    });

What are the differences between them. Which one would be better suited for the job and is there anything better?

SamuraiJack
  • 5,131
  • 15
  • 89
  • 195
  • Are you just moving the file itself to the database, or are you parsing the file for it's contents, such as a .csv and sending the contents to a database. – DekuDesu Jul 28 '21 at 08:57
  • @DekuDesu yes you are parsing the file for it's content. – SamuraiJack Jul 28 '21 at 09:06
  • 1
    You could take a look at this question: [Parallel.ForEach vs Task.Run and Task.WhenAll](https://stackoverflow.com/questions/19102966/parallel-foreach-vs-task-run-and-task-whenall). It might have direct answers to your questions. Btw none of the options you mentioned in the question is the optimal one. You won't get the best performance with data-parallelism alone. You also need task-parallelism. There is an example [here](https://stackoverflow.com/questions/62602684/c-sharp-process-files-concurrently-and-asynchronously/62613098#62613098). – Theodor Zoulias Jul 28 '21 at 09:11
  • 1
    Your 3rd way, the "async await" approach is coded very poorly. It's not actually using `await` and the `async` is `async void` when it should be `async Task`. And it would be better to run your own `Task.Run` call to ensure it gets pushed off to a background task. – Enigmativity Jul 28 '21 at 23:47

2 Answers2

0

You could also use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

IObservable<string> query =
    from filePath in filePaths.ToObservable()
    from file in Observable.Start(() => ReadFile(filePath))
    from db in Observable.Start(() => StoreInDb(file))
    select filePath;

IDisposable subscription =
    query
        .Subscribe(
            filePath => Console.WriteLine($"{filePath} Processed."),
            () => Console.WriteLine("Done."));
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
-1

I wrote a simple extension method to help start async tasks, limit the amount of concurrency, and wait for them all to complete;

public static async Task WhenAll(this IEnumerable<Task> tasks, int batchSize)
{
    var started = new List<Task>();

    foreach(var t in tasks)
    {
        started.Add(t);
        if (started.Count >= batchSize)
        {
            var ended = await Task.WhenAny(started);
            started.Remove(ended);
        }
    }
    await Task.WhenAll(started);
}

Then you'd want a method to stream the file contents directly into the database. For example;

async Task Process(string filename){
    using var stream = File.OpenRead(filename)

    // TODO connect to the database
    var sqlCommand = ...;
    sqlCommand.CommandText = "update [table] set [column] = @stream";
    sqlCommand.Parameters.Add(new SqlParameter("@stream", SqlDbType.VarBinary)
    {
        Value = stream
    });
    await sqlCommand.ExecuteNonQueryAsync();
}
IEnumerable<string> files = ...;
await files.Select(f => Process(f)).WhenAll(20);

Is this the best approach? Probably not. Since it's too easy to misuse this extension. Accidently starting tasks multiple times, or starting them all at once.

Jeremy Lakeman
  • 9,515
  • 25
  • 29
  • For this to work, the `IEnumerable tasks` should not be a materialized collection. You could add some argument validation code like `if (tasks is ICollection) throw...`, but overall this is not a good solution for the reasons you mentioned. There is also a bug at the final `Task.WhenAll`, where only the last `batchSize` tasks will be awaited, and any exception thrown by the previous tasks will be swallowed. In general the correct tool to use for throttling is the `SemaphoreSlim`, not the `Task.WhenAny`. – Theodor Zoulias Jul 29 '21 at 06:10