0

I am building a class to use parallel loop to access messages from message queue, in order to explain my issue I created a simplified version of code:

public class Worker
{
    private IMessageQueue mq;
    public Worker(IMessageQueue mq)
    {
        this.mq = mq;
    }

    public int Concurrency
    {
        get
        {
            return 5;
        }
    }

    public void DoWork()
    {
        int totalFoundMessage = 0;

        do
        {
            // reset for every loop
            totalFoundMessage = 0;

            Parallel.For<int>(
                0,
                this.Concurrency,
                () => 0,
                (i, loopState, localState) =>
                {
                    Message data = this.mq.GetFromMessageQueue("MessageQueueName");

                    if (data != null)
                    {
                        return localState + 1;
                    }
                    else
                    {
                        return localState + 0;
                    }
                },
                localState =>
                {
                    Interlocked.Add(ref totalFoundMessage, localState);
                });
        }
        while (totalFoundMessage >= this.Concurrency);
    }
}

The idea is to set the worker class a concurrency value to control the parallel loop. If after each loop the number of message to retrieve from message queue equals to the concurrency number I assume there are potential more messages in the queue and continue to fetch from queue until the message number is smaller than the concurrency. The TPL code is also inspired by TPL Data Parallelism Issue post.

I have the interface to message queue and message object.

public interface IMessageQueue
{
    Message GetFromMessageQueue(string queueName);
}

public class Message
{
}

Thus I created my unit test codes and I used Moq to mock the IMessageQueue interface

    [TestMethod()]
    public void DoWorkTest()
    {
        Mock<IMessageQueue> mqMock = new Mock<IMessageQueue>();

        Message data = new Message();

        Worker w = new Worker(mqMock.Object);

        int callCounter = 0;
        int messageNumber = 11;
        mqMock.Setup(x => x.GetFromMessageQueue("MessageQueueName")).Returns(() =>
        {
            callCounter++;
            if (callCounter < messageNumber)
            {
                return data;
            }
            else
            {
                // simulate MSMQ's behavior last call to empty queue returns null
                return (Message)null;
            }
        }
        );

        w.DoWork();

        int expectedCallTimes = w.Concurrency * (messageNumber / w.Concurrency);
        if (messageNumber % w.Concurrency > 0)
        {
            expectedCallTimes += w.Concurrency;
        }

        mqMock.Verify(x => x.GetFromMessageQueue("MessageQueueName"), Times.Exactly(expectedCallTimes));
    }

I used the idea from Moq to set up a function return based on called times to set up call times based response.

During the unit testing I noticed the testing result is unstable, if you run it multiple times you will see in most cases the test passes, but occasionally the test fails for various reasons.

I have no clue what caused the situation and look for some input from you. Thanks

Community
  • 1
  • 1
hardywang
  • 4,864
  • 11
  • 65
  • 101

2 Answers2

1

The problem is that your mocked GetFromMessageQueue() is not thread-safe, but you're calling it from multiple threads at the same time. ++ is inherently thread-unsafe operation.

Instead, you should use locking or Interlocked.Increment().

Also, in your code, you're likely not going to benefit from parallelism, because starting and stopping Parallel.ForEach() has some overhead. A better way would be to have a while (or do-while) inside the Parallel.ForEach(), not the other way around.

svick
  • 236,525
  • 50
  • 385
  • 514
0

My approach would be to restructure. When testing things like timing or concurrency, it is usually prudent to abstract your calls (in this case, use of PLINQ) into a separate class that accepts a number of delegates. You can then test the correct calls are being made to the new class. Then, because the new class is a lot simpler (only a single PLINQ call) and contains no logic, you can leave it untested.

I advocate not testing in this case because unless you are working on something super-critical (life support systems, airplanes, etc), it becomes more trouble than it's worth to test. Trust the framework will execute the PLINQ query as expected. You should only be testing those things which make sense to test, and that provide value to your project or client.

Camron B
  • 1,650
  • 2
  • 14
  • 30
  • Thanks for the input, I have omitted the code how to process a message from queue already, the code will be covered in separated unit tests. In my case the processing of each message may be slow thus I don't want to wait in sequence so I choose to use TPL. The purpose of this test is to make sure the parallelism is coded correctly. – hardywang Feb 15 '13 at 17:47