2

I have used Dns.BeginGetHostEntry method to get the FQDN for the hosts based on host name (List of the host names is stored in SQL server database). This method (asynchronous) completes the run in less than 30 minutes for nearly 150k records and updates the FQDN in the same table of SQL where the host name is stored.

This solution runs too fast (exceeding the threshold of 300 requests per second). Since the permitted no. of a request for a server to generate is limited, my server is listed in the top talker and requested to stop the run of this application. I have to rebuild this application to run synchronously which now takes more than 6 hours to complete.

//// TotalRecords are fetched from SQL database with the Hostname (referred as host further)
for (int i = 0; i < TotalRecords.Rows.Count; i++)
{
    try
    {
        host = TotalRecords.Rows[i].ItemArray[0].ToString();
        Interlocked.Increment(ref requestCounter);
        string[] arr = new string[] { i.ToString(), host }; 
        Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr);
    }
    catch (Exception ex)
    {
        log.Error("Unknown error occurred\n ", ex);
    }
}
do
{
    Thread.Sleep(0);

} while (requestCounter>0);

ListAdapter.Update(TotalRecords);

Questions:

  1. Is there any way the number of requests generated by this method can be limited per second?

  2. I have an understanding that ParallelOptions.MaxDegreeOfParallelism does not control the threads per second, so is there any way TPL can be the better option? Can this be limited to no. of requests per second?

VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • Have you considered a queue which is set to only process an item every x, so at most it runs 300 per second (300 per second is quite a lot after all) – BugFinder May 22 '17 at 07:49
  • Possible duplicate of [Simple way to rate limit HttpClient requests](http://stackoverflow.com/questions/35493925/simple-way-to-rate-limit-httpclient-requests) – bradgonesurfing May 22 '17 at 08:19
  • Yes SemaphoreSlim and a timer can solve this – Sir Rufo May 22 '17 at 08:22
  • Please don't add more unrelated questions to the original question. Stackoverflow works best with focused questions. Generate seperate questions about rate limiting / SQL optimization / IPV6 vs IPV4. – bradgonesurfing May 22 '17 at 15:14
  • @bradgonesurfing Since new to Stack overflow, created new questions and kept the original post as is with focused queries. thanks of your answers, will surely try ! – swati gupta May 22 '17 at 16:19
  • Welcome to stack overflow. I hope you find it useful. Also please mark answers that are helpful or accepted. – bradgonesurfing May 22 '17 at 17:39

3 Answers3

0

Use a SemaphoreSlim with a Timer to limit the requests per period.

[DebuggerDisplay( "Current Count = {_semaphore.CurrentCount}" )]
public class TimedSemaphoreSlim : IDisposable
{
    private readonly System.Threading.SemaphoreSlim _semaphore;
    private readonly System.Threading.Timer _timer;
    private int _releaseCount;

    public TimedSemaphoreSlim( int initialcount, TimeSpan period )
    {
        _semaphore = new System.Threading.SemaphoreSlim( initialcount );
        _timer = new System.Threading.Timer( OnTimer, this, period, period );
    }

    public TimedSemaphoreSlim( int initialCount, int maxCount, TimeSpan period )
    {
        _semaphore = new SemaphoreSlim( initialCount, maxCount );
        _timer = new Timer( OnTimer, this, period, period );
    }

    private void OnTimer( object state )
    {
        var releaseCount = Interlocked.Exchange( ref _releaseCount, 0 );
        if ( releaseCount > 0 )
            _semaphore.Release( releaseCount );
    }

    public WaitHandle AvailableWaitHandle => _semaphore.AvailableWaitHandle;
    public int CurrentCount => _semaphore.CurrentCount;

    public void Release()
    {
        Interlocked.Increment( ref _releaseCount );
    }

    public void Release( int releaseCount )
    {
        Interlocked.Add( ref _releaseCount, releaseCount );
    }

    public void Wait()
    {
        _semaphore.Wait();
    }

    public void Wait( CancellationToken cancellationToken )
    {
        _semaphore.Wait( cancellationToken );
    }

    public bool Wait( int millisecondsTimeout )
    {
        return _semaphore.Wait( millisecondsTimeout );
    }

    public bool Wait( int millisecondsTimeout, CancellationToken cancellationToken )
    {
        return _semaphore.Wait( millisecondsTimeout, cancellationToken );
    }

    public bool Wait( TimeSpan timeout, CancellationToken cancellationToken )
    {
        return _semaphore.Wait( timeout, cancellationToken );
    }

    public Task WaitAsync()
    {
        return _semaphore.WaitAsync();
    }

    public Task WaitAsync( CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( cancellationToken );
    }

    public Task<bool> WaitAsync( int millisecondsTimeout )
    {
        return _semaphore.WaitAsync( millisecondsTimeout );
    }

    public Task<bool> WaitAsync( TimeSpan timeout )
    {
        return _semaphore.WaitAsync( timeout );
    }

    public Task<bool> WaitAsync( int millisecondsTimeout, CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( millisecondsTimeout, cancellationToken );
    }

    public Task<bool> WaitAsync( TimeSpan timeout, CancellationToken cancellationToken )
    {
        return _semaphore.WaitAsync( timeout, cancellationToken );
    }

    #region IDisposable Support
    private bool disposedValue = false; // Dient zur Erkennung redundanter Aufrufe.

    private void CheckDisposed()
    {
        if ( disposedValue )
        {
            throw new ObjectDisposedException( nameof( TimedSemaphoreSlim ) );
        }
    }

    protected virtual void Dispose( bool disposing )
    {
        if ( !disposedValue )
        {
            if ( disposing )
            {
                _timer.Dispose();
                _semaphore.Dispose();
            }

            disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose( true );
    }
    #endregion
}

Sample usage

IEnumerable<string> bunchOfHosts = GetBunchOfHosts();
IList<IPHostEntry> result;

using ( var limiter = new TimedSemaphoreSlim( 300, 300, TimeSpan.FromSeconds( 1 ) ) )
{
    result = bunchOfHosts.AsParallel()
        .Select( e =>
        {
            limiter.Wait();
            try
            {
                return Dns.GetHostEntry( e );
            }
            finally
            {
                limiter.Release();
            }
        } )
        .ToList();
}
Sir Rufo
  • 18,395
  • 2
  • 39
  • 73
  • This doesn't work really well when you replace `Console.WriteLine` with something like a DnsRequest which takes a long time to run and if you want to maintain a certain rate you need to run the requests in parallel. – bradgonesurfing May 22 '17 at 10:06
  • @bradgonesurfing I was thinking about that too and extended that class to behave as a SemaphoreSlim (Wait(); Act(); Release();) but the real release is done by the internal timer – Sir Rufo May 22 '17 at 10:20
0

A purely async solution.

It uses one nuget package Nite.AsyncEx and System.Reactive It performs error handling and provides the results of the DNS as they occur as an IObservable<IPHostEntry>

There is a lot going on here. You will need to understand reactive extensions as standard async programming. There are probably many ways to achieve the below result but it is an interesting solution.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Threading;

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

public static class EnumerableExtensions
{
    public static IEnumerable<Func<U>> Defer<T, U>
        ( this IEnumerable<T> source, Func<T, U> selector) 
        => source.Select(s => (Func<U>)(() => selector(s)));
}


public class Program
{
    /// <summary>
    /// Returns the time to wait before processing another item
    /// if the rate limit is to be maintained
    /// </summary>
    /// <param name="desiredRateLimit"></param>
    /// <param name="currentItemCount"></param>
    /// <param name="elapsedTotalSeconds"></param>
    /// <returns></returns>
    private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
    {
        var time = elapsedTotalSeconds;
        var timeout = currentItemCount / desiredRateLimit;
        return timeout - time;
    }

    /// <summary>
    /// Consume the tasks in parallel but with a rate limit. The results
    /// are returned as an observable.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="tasks"></param>
    /// <param name="rateLimit"></param>
    /// <returns></returns>
    public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
        var s = System.Diagnostics.Stopwatch.StartNew();
        var n = 0;
        var sem = new  AsyncCountdownEvent(1);

        var errors = new ConcurrentBag<Exception>();

        return Observable.Create<T>
            ( observer =>
            {

                var ctx = new CancellationTokenSource();
                Task.Run
                    ( async () =>
                    {
                        foreach (var taskFn in tasks)
                        {
                            n++;
                            ctx.Token.ThrowIfCancellationRequested();

                            var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
                            var delay = Delay( rateLimit, n, elapsedTotalSeconds );
                            if (delay > 0)
                                await Task.Delay( TimeSpan.FromSeconds( delay ), ctx.Token );

                            sem.AddCount( 1 );
                            Task.Run
                                ( async () =>
                                {
                                    try
                                    {
                                        observer.OnNext( await taskFn() );
                                    }
                                    catch (Exception e)
                                    {
                                        errors.Add( e );
                                    }
                                    finally
                                    {
                                        sem.Signal();
                                    }
                                }
                                , ctx.Token );
                        }
                        sem.Signal();
                        await sem.WaitAsync( ctx.Token );
                        if(errors.Count>0)
                            observer.OnError(new AggregateException(errors));
                        else
                            observer.OnCompleted();
                    }
                      , ctx.Token );

                return Disposable.Create( () => ctx.Cancel() );
            } );
    }

    #region hosts



    public static string [] Hosts = new [] { "google.com" }

    #endregion


    public static void Main()
    {
        var s = System.Diagnostics.Stopwatch.StartNew();

        var rate = 25;

        var n = Hosts.Length;

        var expectedTime = n/rate;

        IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer( async host =>
        {
            try
            {
                return await Dns.GetHostEntryAsync( host );
            }
            catch (Exception e)
            {
                throw new Exception($"Can't resolve {host}", e);
            }
        } );

        IObservable<IPHostEntry> results = RateLimit( dnsTaskFactories, rate );

        results
            .Subscribe( result =>
            {
                Console.WriteLine( "result " + DateTime.Now + " " + result.AddressList[0].ToString() );
            },
            onCompleted: () =>
            {
                Console.WriteLine( "Completed" );

                PrintTimes( s, expectedTime );
            },
            onError: e =>
            {
                Console.WriteLine( "Errored" );

                PrintTimes( s, expectedTime );

                if (e is AggregateException ae)
                {
                    Console.WriteLine( e.Message );
                    foreach (var innerE in ae.InnerExceptions)
                    {
                        Console.WriteLine( $"     " + innerE.GetType().Name + " " + innerE.Message );
                    }
                }
                else
                {
                        Console.WriteLine( $"got error " + e.Message );
                }
            }

            );

        Console.WriteLine("Press enter to exit");
        Console.ReadLine();
    }

    private static void PrintTimes(Stopwatch s, int expectedTime)
    {
        Console.WriteLine( "Done" );
        Console.WriteLine( "Elapsed Seconds " + s.Elapsed.TotalSeconds );
        Console.WriteLine( "Expected Elapsed Seconds " + expectedTime );
    }
}

The last few lines of output are

result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 157.7.105.52
result 5/23/2017 3:23:36 PM 223.223.182.225
result 5/23/2017 3:23:36 PM 64.34.93.5
result 5/23/2017 3:23:36 PM 212.83.211.103
result 5/23/2017 3:23:36 PM 205.185.216.10
result 5/23/2017 3:23:36 PM 198.232.125.32
result 5/23/2017 3:23:36 PM 66.231.176.100
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:37 PM 219.84.203.116
Errored
Done
Elapsed Seconds 19.9990118
Expected Elapsed Seconds 19
One or more errors occurred.
     Exception Can't resolve adv758968.ru
     Exception Can't resolve fr.a3dfp.net
     Exception Can't resolve ads.adwitserver.com
     Exception Can't resolve www.adtrader.com
     Exception Can't resolve trak-analytics.blic.rs
     Exception Can't resolve ads.buzzcity.net

I couldn't paste the full code so here is a link to the code with the hosts list.

https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190

bradgonesurfing
  • 30,949
  • 17
  • 114
  • 217
  • Thanks for the input. Is there a possible solution without including the Nuget package ? I have added few more questions to the original post which seems bottlenecks in my application. Please review and help me with a more complete solution. – swati gupta May 22 '17 at 15:01
  • What's wrong with including a nuget package? The included framework CountDownEvent does not have a `WaitAsync`. An oversight perhaps. If you don't include it you have to build your own counter from SemaphoreSlim and other fiddly things. – bradgonesurfing May 22 '17 at 15:06
  • (1 and 2) are already answered by my response. (3) Ask another question on stackoverflow on this. It is only vaugely related to the issue of rate limiting. (4) Easy ( Number of sucessful request / time since start) Print it to the console, log it whatever. (5) Ask another stackoverflow question. Unrelated to rate limiting problem – bradgonesurfing May 22 '17 at 15:10
  • The source code for CountDownEvent is [here](https://github.com/StephenCleary/AsyncEx/blob/0aaae920de495040aae9539775e93c7a063854e3/src/Nito.AsyncEx.Coordination/AsyncCountdownEvent.cs) if you feel like recoding it without the nuget package. – bradgonesurfing May 22 '17 at 15:17
  • bradgonesurfing - Could you please provide some description about the flow of this code ? When using Dns.GetHostEntryAsync method where exactly should it be called as there is no call back required in this method and the FQDN can directly be assigned to the dataset. Some small description about all the delays and async method could be helpful to understand the program as a whole. Thanks in advance ! – swati gupta May 23 '17 at 10:53
  • There could be a possibility of finding some exceptions while searching the host. Where can the exception handling be added if using the Dns.GetHostEntryAsync inside (Func)(async () =>..) – swati gupta May 23 '17 at 10:58
  • You would have to collect them yourself. – bradgonesurfing May 23 '17 at 11:25
  • Replace enumerate. Range with your list of Hostname to lookup. Replace the Task.Delay with the call to gethostasync. Collecting all the results is another challenge for you :) – bradgonesurfing May 23 '17 at 11:28
  • It's your lucky day. I've updated the solution with a full example of processing real DNS requests. Code is [here](https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190) The code pasted in the above answer is missing the long hosts list for testing. Please mark the answer as accepted or vote it up if it is useful. You probably won't understand it fully at first look. Load it in a debugger and step through it and read the links about reactive extensions and async await. – bradgonesurfing May 23 '17 at 13:32
  • This seems useful to me, but with such complex code, it's hard to understand for someone who has not worked with TPL,async or threads in past. I would like to understand before implementing anything. My requirement is pretty simple, a dedicated console application(even the console window is hidden for this solution, so no output required) using GetHostEntry/GetHostEntryAsync with limited no. of requests per second. I still don't understand the need for using Subscribe/ defer etc. I am still spending time to understand but any simpler solution could be really helpful – swati gupta May 24 '17 at 12:59
  • You want to do complex stuff like threading, rate limiting and concurrent operations you will need to take time to learn stuff. – bradgonesurfing May 24 '17 at 13:00
  • Best thing I can suggest is get the code I presented to you working as I gave it and start playing with it in the debugger. For each bit you don't understand look up the documentation / google it and read/read/read – bradgonesurfing May 24 '17 at 13:01
  • I am finding an error in the definition of this class: static class EnumerableExtensions for Invalid token '=>' – swati gupta May 24 '17 at 13:27
0

Had you ever considered to use the TPL Dataflow library? It has a very convenient way to limit the concurrent operations of the same type. Also it has an opportunity to throttle whole pipeline by limiting the buffer size.

Basically all you need to create is a pipeline with:

So your code could be like this:

// buffer limited to 30 items in queue
// all other items would be postponed and added to queue automatically
// order in queue is preserved
var hosts = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 30 });

// get a host and perform a dns search operation
var handler = new TransformBlock<string, IPHostEntry>(host => Dns.GetHostEntry(host),
  // no more than 5 simultaneous requests at a time
  new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

// gather results in an array of size 500 
var batchBlock = new BatchBlock<IPHostEntry>(500);

// get the resulting array and save it to database
var batchSave = new ActionBlock<IPHostEntry[]>(r => GetHostEntryCallback(r));

// link all the blocks to automatically propagate items along the pipeline
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
hosts.LinkTo(handler, linkOptions);
handler.LinkTo(batchBlock, linkOptions);
batchBlock.LinkTo(batchSave, linkOptions);

// provide the data to pipeline
for (var i = 0; i < TotalRecords.Rows.Count; ++i)
{
    var host = TotalRecords.Rows[i].ItemArray[0].ToString();
    // async wait for item to be sent to pipeline
    // will throttle starting with 31th item in a buffer queue
    await hosts.SendAsync(host);
}

// pipeline is complete now, just wait it finishes
hosts.Complete();

// wait for the last block to finish it's execution
await batchSave.Completion;

// notify user that update is over

I encourage you to read whole How-to section on MSDN to get a better understanding what you can do with this library, maybe continuing the read with official documentation.

By the way, you may use the SqlBulkCopy class to update the database, if it will fit your requirements, usually it's faster than a regular update with SqlDataAdapter.

VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • The requirement is to throttle the requests based on time(per second), maxdegreeofparallelism would not help here. – swati gupta May 24 '17 at 13:01
  • You can implement a simple block to control troughput, but there is nothing inside TPL providing you such functionality out of box. – VMAtm May 24 '17 at 14:27