-1

I need to understand how SemaphoreSlim works with ThreadPool.

I need to read a file from the local disk of a computer, read each line, and then send that line as a HttpWebRequest to 2 different servers, and correspondingly get 2 HttpWebRequest back.

So lets say that file has 100 requests (this number can be in thousands or even more in a real-time scenario), and when all these requests will be sent I should get back 200 responses (like I mentioned each request should go to 2 different servers and fetch 2 responses from them). Here is my code:

      static void Main(string[] args)
      {
          try
          {
             SendEntriesInFile(someFileOnTheLocaldisk);
             Console.WriteLine();
           }
           catch (Exception e)
           {
            Console.WriteLine("Regression Tool Error: Major Unspecified Error:\n" + e);
           }
          }
       }        

       public class MyParameters
       {
       }
  

       private void SendEntriesInFile(FileInfo file)
       {
           static SemaphoreSlim threadSemaphore = new SemaphoreSlim(5, 10);
          

           using (StreamReader reader = file.OpenText())
           {
           
            string entry = reader.ReadLine();
            
            while (!String.IsNullOrEmpty(entry))
            {
              MyParameters myParams = new MyParameters(entry, totalNumberOfEntries, serverAddresses, requestType, fileName);
              threadSemaphore.Wait();
              ThreadPool.QueueUserWorkItem(new WaitCallback(Send), requestParams);

              entry = reader.ReadLine();
            }
         }
      }


     private void Send(object MyParameters)
     {
        MyParameters myParams = (MyParameters)MyParameters;

        for(int i=0; i < myParams.ServerAddresses.Count; i++)
        {
            byte[] bytesArray = null;
            bytesArray = Encoding.UTF8.GetBytes(myParams.Request);
            HttpWebRequest webRequest = null;

            if (reqParams.TypeOfRequest == "tlc")
            {
                 webRequest = (HttpWebRequest)WebRequest.Create(string.Format("http://{0}:{1}{2}", myParams .ServerAddresses[i], port, "/SomeMethod1"));
            }
            else
            {
                 webRequest = (HttpWebRequest)WebRequest.Create(string.Format("http://{0}:{1}{2}", myParams .ServerAddresses[i], port, "/SomeMethod2"));
            }
            if (webRequest != null)
            {
                webRequest.Method = "POST";
                webRequest.ContentType = "application/x-www-form-urlencoded";
                webRequest.ContentLength = bytesArray.Length;
                webRequest.Timeout = responseTimeout;
                webRequest.ReadWriteTimeout = transmissionTimeout;
                webRequest.ServicePoint.ConnectionLimit = maxConnections;
                webRequest.ServicePoint.ConnectionLeaseTimeout = connectionLifeDuration;
                webRequest.ServicePoint.MaxIdleTime = maxConnectionIdleTimeout;
                webRequest.ServicePoint.UseNagleAlgorithm = nagleAlgorithm;
                webRequest.ServicePoint.Expect100Continue = oneHundredContinue;
               

                using (Stream requestStream = webRequest.GetRequestStream())
                {
                    //Write the request through the request stream
                    requestStream.Write(bytesArray, 0, bytesArray.Length);
                    requestStream.Flush();
                }

                string response = "";
                using (HttpWebResponse httpWebResponse = (HttpWebResponse)webRequest.GetResponse())
                {
                    if (httpWebResponse != null)
                    {
                        using (Stream responseStream = httpWebResponse.GetResponseStream())
                        {
                            using (StreamReader stmReader = new StreamReader(responseStream))
                            {
                                response = stmReader.ReadToEnd();
                                string fileName = "";

                                if(i ==0)
                                {
                                    fileName = Name is generated through some logic here;
                                }
                                else
                                {
                                    fileName =  Name is generated through some logic here;
                                }

                                using (StreamWriter writer = new StreamWriter(fileName))
                                {
                                    writer.WriteLine(response);
                                }
                            }
                        }
                    }
                }
             }
        }
      Console.WriteLine("   Release semaphore: ---- " + threadSemaphore.Release());
    }

The only thing is that I'm confused with that when I do something like above my Semaphore allow 5 threads to concurrently execute the Send() method and 5 other threads they wait in a queue for their turn. Since my file contains 100 requests so I should get back 200 responses. Every time I ended up getting only 109 or 107 or 108 responses back. Why I don't get all the 200 responses back. Though using a different code (not discussed here) when I send lets say 10 requests in parallel on 10 different created threads (not using the ThreadPool rather creating threads on-demand) I get back all the 200 responses.

Mauricio Gracia Gutierrez
  • 10,288
  • 6
  • 68
  • 99
user2913184
  • 590
  • 10
  • 33
  • 1
    I don't know the answer to your question, but personally I would avoid working with the `ThreadPool` directly. I would prefer using a higher level abstraction like the [TPL Dataflow](https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) library, as in [this](https://stackoverflow.com/a/60930992/11178549) answer. This library contains components ("blocks") that can be linked to each other, and form a processing pipeline with the data flowing from the first block to the last. Each block can be configured with a specific degree of parallelism. – Theodor Zoulias Apr 29 '21 at 18:36
  • 4
    Or, don’t use HttpWebRequest (which is obsolete for almost 10 years now), use HttpClient, and do async requests. This way you wouldn’t need to explicitly use the ThreadPool. – ckuri Apr 29 '21 at 18:56
  • Lots of good advice in the comments above. Besides that, you don't seem to catch any exceptions. In general, when using the thread pool directly, exceptions that are uncaught wind up completely unobserved. It seems likely to me that some number of your web requests are simply failing, and you don't bother to notice. If you need actual help beyond that, please fix your question so it includes a proper [mcve]. – Peter Duniho Apr 29 '21 at 22:15

1 Answers1

-1

Lots of very helpful comments. To add to them, I would recommend using just pure async io for this.

I would recommend applying the Semaphore as a DelegatingHandler.

You want to register this class.

public class LimitedConcurrentHttpHandler
   : DelegatingHandler
{
    private readonly SemaphoreSlim _concurrencyLimit = new(8);

    public LimitedConcurrentHttpHandler(){ }
    public LimitedConcurrentHttpHandler(HttpMessageHandler inner) : base(inner) {}

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        await _concurrencyLimit.Wait(cancellationToken);
        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        finally
        {
            _concurrencyLimit.Release();
        }
    }       
}

This way, you can concentrate on your actual business logic within your actual code.

Aron
  • 15,464
  • 3
  • 31
  • 64