0

I'm implementing the MailChimp.NET wrapper in both synchronous and asynchronous ways and calls are going through without a problem, BUT results tend to get lost in the synchronous methods. In other words, if I send 100 members to be added (by batches of 10 due to the simultaneous connections limit of the MailChimp API), all 100 will indeed be visible in my MC audience but I'll loose from 5 to 25% of the results on code side. Here's the concerned bit of my implementation :

public class MailChimpClient : IDisposable
{
    private MailChimpManager _mcm;
    private string _apiKey;
    private bool _isDisposed;
    private ConcurrentQueue<MailChimpMember> _updatedMembersQueue;
    private ConcurrentQueue<MailChimpBaseException> _exceptionsQueue;

    private const int BatchSize = 10;
    private const int TaskDelay = 100;

    private ConcurrentQueue<MailChimpMember> UpdatedMembersQueue
    {
        get { return _updatedMembersQueue = _updatedMembersQueue ?? new ConcurrentQueue<MailChimpMember>(); }
        set { _updatedMembersQueue = value; }
    }

    private ConcurrentQueue<MailChimpBaseException> ExceptionsQueue
    {
        get { return _exceptionsQueue = _exceptionsQueue ?? new ConcurrentQueue<MailChimpBaseException>(); }
        set { _exceptionsQueue = value; }
    }

    public MailChimpClient(string apiKey)
    {
        _apiKey = apiKey;
        _mcm = new MailChimpManager(apiKey);
    }

    private async Task AddOrUpdateMember(MailChimpMember member, string listId)
    {
        try
        {
            var model = member.ToApiMember();
            model = await _mcm.Members.AddOrUpdateAsync(listId, model);
            UpdatedMembersQueue.Enqueue(new MailChimpMember(model));
            await Task.Delay(TaskDelay);
        }
        catch (Exception ex)
        {
            var mccex = new MailChimpClientException($"Error adding/updating member \"{(member != null ? member.MailAddress.ToString() : "NULL")}\" to list with ID \"{listId}\".", ex);
            ExceptionsQueue.Enqueue(mccex);
        }
    }

    private MailChimpClientResult AddOrUpdateMemberRange(IEnumerable<MailChimpMember> members, string listId)
    {
        var batches = members.GetBatches(BatchSize);
        var result = new MailChimpClientResult();

        var i = 0;

        foreach (var batch in batches)
        {
            AddOrUpdateMemberBatch(batch, listId);
            i++;
            FlushQueues(ref result);
        }

        return result;
    }

    private void AddOrUpdateMemberBatch(MailChimpMember[] batch, string listId)
    {
        Task.WaitAll(batch.Select(async b => await AddOrUpdateMember(b, listId)).ToArray(), -1);
    }

    private void FlushQueues(ref MailChimpClientResult result)
    {
        result.UpdatedMembers.FlushQueue(UpdatedMembersQueue);
        result.Exceptions.FlushQueue(ExceptionsQueue);
    }

    public MailChimpClientResult AddOrUpdate(MailChimpMember member, string listId)
    {
        return AddOrUpdateMemberRange(new MailChimpMember[] { member }, listId);
    }

    public MailChimpClientResult AddOrUpdate(IEnumerable<MailChimpMember> members, string listId)
    {
        return AddOrUpdateMemberRange(members, listId);
    }
}

public static class CollectionExtensions
{
    public static T[][] GetBatches<T>(this IEnumerable<T> items, int batchSize)
    {
        var result = new List<T[]>();
        var batch = new List<T>();
        foreach (var t in items)
        {
            if (batch.Count == batchSize)
            {
                result.Add(batch.ToArray());
                batch.Clear();
            }
            batch.Add(t);
        }
        result.Add(batch.ToArray());
        batch.Clear();

        return result.ToArray();
    }

    public static void FlushQueue<T>(this IList<T> list, ConcurrentQueue<T> queue)
    {
        T item;
        while (queue.TryDequeue(out item))
            list.Add(item);
    }
}

MailChimpMember being a public copy of the MailChimp.NET Member. The problem seems to happen in the batch processing method, the Task.WaitAll(...) instruction firing its completion event before all calls are complete, therefore not all results are queued. I tried delaying the execution of each individual treatment with Task.Delay() but with little to no result.

Does anyone have an idea what is failing in my implementation ?

ZipionLive
  • 722
  • 1
  • 12
  • 23
  • 1
    *"Task.WaitAll(...) instruction firing its completion event before all calls are complete"* - How did you determine this? – Gabriel Luci Dec 16 '19 at 16:30
  • 3
    Side note: there is no point to lazily initializing the queues (the chance that *these* object instances are causing memory pressure is close to zero), and in fact this is a bad idea because you're inviting threading problems. Just do `private ConcurrentQueue _updatedMembersQueue = new ConcurrentQueue()`. – Jeroen Mostert Dec 16 '19 at 16:30
  • https://stackoverflow.com/a/32119616/3608792 – Dan Wilson Dec 16 '19 at 16:31
  • @Gabriel Luci By examining the queues after each batch treatment, I noticed sometimes less than 10 (the size of my batches) were added. – ZipionLive Dec 16 '19 at 16:33
  • 1
    @Dan Wilson, solution tried and failed, I had the exact same issue. – ZipionLive Dec 16 '19 at 16:34
  • Hmm, that's frustrating. – Dan Wilson Dec 16 '19 at 16:40
  • As well as the answer in the linked question (https://stackoverflow.com/a/19850142/117215), you may want to consider making this entire thing async (to get full advantages of it) and use a WhenAll rather than waitall – Paddy Dec 16 '19 at 16:43
  • @Jeroen Mostert It seems that what you pointed out was the actual source of my problem. Can you add it as an answer so I can accept it and close the question ? I wouldn't want to steal your glory :-p. And, obviously, thanks a lot for the solution ! – ZipionLive Dec 16 '19 at 16:44
  • @Paddy I actually implemented both a synchronous and an asynchronous version of all methods as stated in the description. I'm developping this wrapper as a part of a consulting mission, the goal being that the client can choose either of those implementations according to his needs. – ZipionLive Dec 16 '19 at 16:47
  • Interesting; while the threading problems are obvious I wouldn't actually have thought you'd run into them. You need some unlucky timing for that! (For good measure, add `readonly` to the members so you're sure they're not reassigned somewhere else.) I can't add an answer at the moment as the question is closed as a dupe. Certainly, avoiding solutions that use `.WaitAll` and/or try to wrap async over sync is preferable in general. – Jeroen Mostert Dec 16 '19 at 16:54
  • @JeroenMostert, `I wouldn't actually have thought you'd run into them` - for me it looks as a key problem here. Despite the fact the the running platform is unknown, the `net-standard` with high probability implies that it's a .NET Core application which significantly increases the chances that there is no synchronization context and `UpdatedMembersQueue.Enqueue(new MailChimpMember(model));` continuation is scheduled on a thread pool. That and the fact that some (if not all) of 10 identical requests can finish at the same time makes the risk using more than one copy of queue by them very high. – Dmytro Mukalov Dec 17 '19 at 12:07
  • @JeroenMostert - I was interested in this as well. This fiddle reproduces the issue every time and uncommenting the line of code in main fixes it. https://dotnetfiddle.net/p1hcc9 – Paddy Dec 17 '19 at 12:43
  • 1
    @Paddy, yeah, the `UpdatedMembersQueue` getter simply isn't atomic and sometimes you end up with multiple instances of the `ConcurrentQueue` because threads compete on the `UpdatedMembersQueue.Enqueue(memberId);` line. – Dmytro Mukalov Dec 17 '19 at 13:31

0 Answers0