0

I'm writing a .NET Core Console App that needs to continuously read data from multiple WebSockets. My current approach is to create a new Task (via Task.Run) per WebSocket that runs an infinite while loop and blocks until it reads the data from the socket. However, since the data is pushed at a rather low frequency, the threads just block most of the time which seems quite inefficient.

From my understanding, the async/await pattern should be ideal for blocking I/O operations. However, I'm not sure how to apply it for my situation or even if async/await can improve this in any way - especially since it's a Console app.

I've put together a proof of concept (doing a HTTP GET instead of reading from WebSocket for simplicity). The only way I was able to achieve this was without actually awaiting. Code:

static void Main(string[] args)
{
    Console.WriteLine($"ThreadId={ThreadId}: Main");

    Task task = Task.Run(() => Process("https://duckduckgo.com", "https://stackoverflow.com/"));

    // Do main work.

    task.Wait();
}

private static void Process(params string[] urls)
{
    Dictionary<string, Task<string>> tasks = urls.ToDictionary(x => x, x => (Task<string>)null);
    HttpClient client = new HttpClient();

    while (true)
    {
        foreach (string url in urls)
        {
            Task<string> task = tasks[url];
            if (task == null || task.IsCompleted)
            {
                if (task != null)
                {
                    string result = task.Result;
                    Console.WriteLine($"ThreadId={ThreadId}: Length={result.Length}");
                }
                tasks[url] = ReadString(client, url);
            }
        }
        Thread.Yield();
    }
}

private static async Task<string> ReadString(HttpClient client, string url)
{
    var response = await client.GetAsync(url);
    Console.WriteLine($"ThreadId={ThreadId}: Url={url}");
    return await response.Content.ReadAsStringAsync();
}

private static int ThreadId => Thread.CurrentThread.ManagedThreadId;

This seems to be working and executing on various Worker Threads on the ThreadPool. However, this definitely doesn't seem as any typical async/await code which makes me think there has to be a better way.

Is there a more proper / more elegant way of doing this?

McGuireV10
  • 9,572
  • 5
  • 48
  • 64
loodakrawa
  • 1,468
  • 14
  • 28
  • I think for your use case, observable streams fit nicely.. try: http://reactivex.io/ – Rosdi Kasim Mar 25 '18 at 03:18
  • So basically... You have a list of URLs, you want to kick off a request to each, and then when each request completes you want to handle it and then send another request? – Rawling Mar 25 '18 at 09:56
  • @Rawling - something very similar, just with WebSockets. I'm waiting for data to be pushed through, then read it, covert it, store it and then read again. And the push frequencies range from 1minute - 1day. – loodakrawa Mar 25 '18 at 10:05

3 Answers3

1

You've basically written a version of Task.WhenAny that uses a CPU loop to check for completed tasks rather than... whatever magic the framework method uses behind the scenes.

A more idiomatic version might look like this. (Although it might not - I feel like there should be an easier method of "re-run the completed task" than the reverse dictionary I've used here.)

static void Main(string[] args)
{
    Console.WriteLine($"ThreadId={ThreadId}: Main");

    // No need for Task.Run here.
    var task = Process("https://duckduckgo.com", "https://stackoverflow.com/");
    task.Wait();
}

private static async Task Process(params string[] urls)
{
    // Set up initial dictionary mapping task (per URL) to the URL used.
    HttpClient client = new HttpClient();
    var tasks = urls.ToDictionary(u => client.GetAsync(u), u => u);

    while (true)
    {
        // Wait for any task to complete, get its URL and remove it from the current tasks.
        var firstCompletedTask = await Task.WhenAny(tasks.Keys);
        var firstCompletedUrl = tasks[firstCompletedTask];
        tasks.Remove(firstCompletedTask);

        // Do work with completed task.
        try
        {
            Console.WriteLine($"ThreadId={ThreadId}: URL={firstCompletedUrl}");
            using (var response = await firstCompletedTask)
            {
                var content = await response.Content.ReadAsStringAsync();
                Console.WriteLine($"ThreadId={ThreadId}: Length={content.Length}");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"ThreadId={ThreadId}: Ex={ex}");
        }

        // Queue the task again.
        tasks.Add(client.GetAsync(firstCompletedUrl), firstCompletedUrl);
    }
}

private static int ThreadId => Thread.CurrentThread.ManagedThreadId;
Rawling
  • 49,248
  • 7
  • 89
  • 127
  • 1
    Ah, Task.WhenAny returning the completed task was the bit I was missing. This solution is quite similar to what I've written but I'd rather rely on the framework magic as you put it than a CPU loop if I can avoid it. Thanks! – loodakrawa Mar 25 '18 at 23:58
0

I've accepted Rawling's answer - I believe it is correct for the exact scenario I described. However, with a bit of inverted logic, I ended up with something way simpler - leaving it in case anyone needs something like this:

static void Main(string[] args)
{
    string[] urls = { "https://duckduckgo.com", "https://stackoverflow.com/" };
    HttpClient client = new HttpClient();

    var tasks = urls.Select(async url =>
    {
        while (true) await ReadString(client, url);
    });
    Task.WhenAll(tasks).Wait();
}

private static async Task<string> ReadString(HttpClient client, string url)
{
    var response = await client.GetAsync(url);
    string data = await response.Content.ReadAsStringAsync();
    Console.WriteLine($"Fetched data from url={url}. Length={data.Length}");
    return data;
}
loodakrawa
  • 1,468
  • 14
  • 28
-1

Maybe better question is: do you really need thread per socket in this case? You should think of threads as system-wide resource and you should take this into consideration when spawning them, especially if you don't really know the number of threads that your application will be using. This is a good read: What's the maximum number of threads in Windows Server 2003?

Few years ago .NET team introduced Asynchronous sockets.

...The client is built with an asynchronous socket, so execution of the client application is not suspended while the server returns a response. The application sends a string to the server and then displays the string returned by the server on the console.

Asynchronous Client Socket Example

There are a lot more examples out there showcasing this approach. While it is a bit more complicated and "low level" it let's you be in control.

Igor
  • 3,054
  • 1
  • 22
  • 28