0

I am using a restful api that will return a maximum amount of 50 records per call, if you need more than this you must create multiple calls and pass it an offset.

There are times when we require 'all' of the results to be loaded, we are using something that resembles the code below - this does one request after another and adds the results to a list, stopping when either the maximum is reached or the amount returned in any one call was less than the amount requested.

How can refactor this (using tasks/parallel/threads) to load this data with multiple requests at any one time and still get the exact same results, I have looked at creating multiple Tasks and awaiting them but the problem is that the amount of records to load is unknown until the point of 'no more being available' or hitting the max.

public IEnumerable<T> GetItems(int maxAmount = -1)
{
    var moreData = true;
    var result = new List<T>();
    var counter = 0;
    var batchAmount = 50;
    while(moreData)
    {
        var requestAmount = Math.Min(batchAmount,maxAmount-result.Count);
        var items = GetItemsFromService<T>(requestAmount,counter);
        counter += items.Count;
        moreData = items.Count == requestAmount && (maxAmount == -1 || maxAmount> items.Count);
        result.AddRange(items);
    }
    return result;

}


private IEnumerable<T>  GetItemsFromService(int batchAmount,int offset)
{
    //Lets assume that this gets data from a rest service that returns a maximum of batchAmount
    //and offsets using the offset variable.
}
Richard Friend
  • 15,800
  • 1
  • 42
  • 60
  • Are you sure that the server hosting the RESTful API will allow you to do multiple concurrent requests? It might throttle you. – Jim Mischel Jul 30 '14 at 14:29
  • Look at Reactive Extensions. This might be related: http://stackoverflow.com/questions/24966019/async-with-huge-data-streams – noseratio Jul 31 '14 at 01:33

2 Answers2

0

Unfortunately you can't use async here as you are relying on the number of items from the previous request. This must be synchronous unless you want to do some asynchronous operations on the data that you've received.

It must be a badly designed API that returns paged result without total pages or total number of items.

0

I managed to get this working, basically I keep sending the paged request until one of the requests come back with nothing - since they are started in order once a response comes back with nothing we do not need to make anymore requests, just allow existing requests to finish.

My working code looks like this.

 private IEnumerable<object> GetEntitiesInParallel(Type type, string apiPath, Dictionary<string, string> parameters, int startPosition, int maxAmount)
         {
             var context = new TaskThreadingContext(maxAmount, startPosition);
             var threads = Enumerable.Range(0, NumberOfThreads).Select(i =>
             {
                 var task = Task.Factory.StartNew(() =>
                 {
                     while (context.Continue)
                     {
                         var rawData = String.Empty;
                         var offset = context.NextAmount();
                         var result = GetEntitiesSingleRequest(type, parameters, offset, apiPath, out rawData);
                         if (result.Any())
                         {
                             context.AddResult(result.Cast<object>(), rawData);
                         }
                         else
                         {
                             context.NoResult();
                         }
                     }
                 });

                 return task;
             }).ToArray();

             Task.WaitAll(threads);

             var results = context.GetResults<object>();
             return results;
         }


        private IEnumerable<object> GetEntitiesSingleRequest(Type type,Dictionary<string,string> parameters,
            int offset,string apiPath, out string rawData)
        {
            var request = Utility.CreateRestRequest(apiPath, Method.GET,ApiKey,50,offset,parameters);

            type = typeof(List<>).MakeGenericType(type);
            var method = Client.GetType().GetMethods().Single(m => m.IsGenericMethod && m.Name == "Execute").MakeGenericMethod(type);
            try
            {
                dynamic response = (IRestResponse)method.Invoke(Client, new object[] { request });
                var data = response.Data as IEnumerable;
                var dataList = data.Cast<object>().ToList();
                rawData = response.Content.Replace("\n", Environment.NewLine);
                return dataList.OfType<object>().ToList();

            }
            catch (Exception ex)
            {
                if (ex.Message.IndexOf("404") != -1)
                {
                    rawData = null;
                    return Enumerable.Empty<object>();
                }
                throw;
            }
        }
        private class TaskThreadingContext
        {
            private int batchAmount = 50;
            private object locker1 = new object();
            private object locker2 = new object();
            private CancellationTokenSource tokenSource;
            private CancellationToken token;
            private volatile bool cont = true;
            private volatile int offset = 0;
            private volatile int max = 0;
            private volatile int start = 0;
            private List<object> result = new List<object>();
            private List<string> raw = new List<string>();
            public bool Continue { get { return cont; } }



            public TaskThreadingContext(int maxRows = 0,int startPosition = 0)
            {
                max = maxRows;
                offset = start = startPosition;

            }

            public int NextAmount()
            {
                lock(locker1)
                {
                    var ret = offset;
                    var temp = offset + batchAmount;
                    if (temp - start > max && max > 0)
                    {
                        temp = max - offset;
                    }
                    offset = temp;
                    if (offset - start >= max && max > 0)
                    {
                        cont = false;
                    }
                    return ret;
                }
            }

            public TaskThreadingContext()
            {
                tokenSource = new CancellationTokenSource();
                token = tokenSource.Token;
            }

            public void AddResult(IEnumerable<object> items,string rawData)
            {
                lock(locker2)
                {
                    result.AddRange(items);
                    raw.Add(rawData);
                }
            }

            public IEnumerable<T> GetResults<T>()
            {
                return this.result.Cast<T>().ToList();
            }

            public void NoResult()
            {
                cont = false;
            }

       }
Richard Friend
  • 15,800
  • 1
  • 42
  • 60
  • What was the purpose of checking `temp - start > max && max > 0` and `temp = max - offset` in TaskThreadingContext. It just decreases offset and leads to an endless loop. – KAMAEL May 11 '23 at 11:29