0

I have a stateful service that uses IReliableConcurrentQueue to store state. I am using a Stateless ASP.NET Core service to expose two REST endpoints.

The GET endpoint gets specified number of items from the named IReliableConcurrentQueue. It also writes to a separate reliable concurrent queue in the same transaction before returning. The POST endpoint receives a list of items and adds them to IReliableConcurrentQueue.

When I run the service from Visual Studio locally in debug mode, when I hit one endpoint (GET or POST) repeatedly, it works fine without any issues.

When I hit both endpoints simultaneously, after 3 or 4 hits (sometimes 10), the service stops responding and I see Timeout exceptions.

REST Endpoints (Stateless Service)

[HttpGet]
[Route("{number}/{queueName}")]
public async Task<IEnumerable<QueueItem>> Get(int number, string queueName)
{        
    IEnumerable<QueueItem> items = await statefulServiceProxy.GetItems(number, queueName);
    return items;
}

[HttpPost]
[Route("item")]
public async Task Set([FromBody]IEnumerable<Item> items)
{
    await statefulServiceProxy.SetItem(items);
}

Stateful Service

Always gets stuck in CommitAsync function in either ReadAndLockItems() or SetItem().

public async Task<IEnumerable<QueueItem>> GetItems(int number, string queueName)
{
    var items = await this.ReadAndLockItems(number, queueName);
    return items;
}

private async Task<IEnumerable<QueueItem>> ReadAndLockItems(int number, string queueName)
{
    var itemQueue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<QueueItem>>(queueName);
    var lockQueue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<QueueItem>>("LockQueue");
    List<QueueItem> results = new List<QueueItem>();

    using (var tx = this.StateManager.CreateTransaction())
    {
        for (int i = 0; i < number; i++)
        {
            var element = await itemQueue.TryDequeueAsync(tx);
            if (element.HasValue)
            {
                results.Add(element.Value);
            }
        }

        results.ToList().ForEach(async r =>
        {
            await lockQueue.EnqueueAsync(tx, r);
        });

        await tx.CommitAsync();     // This is where it gets stuck.
    }

    return results;
}

public async Task SetItem(IEnumerable<Product> products)
{ 
    var productQueue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<Product>>("ProductQueue");
    using (var tx = this.StateManager.CreateTransaction())
    {
        foreach (Product p in products)
        {
            await productQueue.EnqueueAsync(tx, p);
        }

        await tx.CommitAsync(); // Sometimes it gets stuck here. It's not consistent. Either in this function or the above one.
    }
}

I checked this Post and tried the changes mentioned there. It doesn't still work.

Exception details:

System.TimeoutException: This can happen if message is dropped when service is busy or its long running operation and taking more time than configured Operation Timeout.
   at Microsoft.ServiceFabric.Services.Communication.Client.ServicePartitionClient`1.<InvokeWithRetryAsync>d__24`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---

Is there anything wrong with the way I am using the collections and the related transactions?

vrcks
  • 419
  • 1
  • 5
  • 23
  • Can you try changing this code: `results.ToList().ForEach(async r => { await lockQueue.EnqueueAsync(tx, r); });` into: `foreach(var r in results) { await lockQueue.EnqueueAsync(tx, r); }` – LoekD Nov 03 '17 at 08:50
  • I tested using the foreach loop. The same issue still persists. Not sure where the call to CommitAsync is vanishing. When I step-debug it, when it hits the CommitAsync method, the control is lost. – vrcks Nov 03 '17 at 19:30
  • I switched from 5 node cluster to 1 node cluster and this issue does not occur. Maybe it is taking time to commit changes and replicate changes to the secondary replicas on my DEV machine.I will deploy to Azure and verify once. – vrcks Nov 03 '17 at 22:08

0 Answers0