3

I'm working on a multithread application for an exercise used to simulate a warehouse (similar to the producer consumer problem) however I'm running into some trouble with the program where increasing the number of consumer threads makes the program behave in unexpected ways.

The code:

I'm creating a producer thread called buyer which has as a goal to order precisely 10 orders from the warehouse each. To do this they have a shared object called warehouse on which a buyer can place an order, the order is then stored in a buffer in the shared object. After this the buyer sleeps for some time until it either tries again or all packs have been bought. The code to do this looks like this:

public void run() {
    //Run until the thread has bought 10 packages, this ensures the thread 
    //will eventually stop execution automatically.
    while(this.packsBought < 10) {
        try {
            //Sleep for a random amount of time between 1 and 50 
            //milliseconds.
            Thread.sleep(this.rand.nextInt(49) + 1);
            //Catch any interruptExceptions.
        } catch (InterruptedException ex) {
            //There is no problem if this exception is thrown, the thread 
            //will just make an order earlier than planned. that being said 
            //there should be no manner in which this exception is thrown.
        }
        
        //Create a new order.
        Order order = new Order(this.rand.nextInt(3)+ 1, 
                                this, 
                                this.isPrime);
        
        //Set the time at which the order was placed as now.
        order.setOrderTime(System.currentTimeMillis());
        
        //place the newly created order in the warehouse.
        this.warehouse.placeOrder(order);
    }
    
    //Notify the thread has finished execution. 
    System.out.println("Thread: " + super.getName() + " has finished.");
}

As you can see the function placeOrder(Order order); is used to place an order at the warehouse. this function is responsible for placing the order in the queue based on some logic related to prime status. The function looks like this:

public void placeOrder(Order order) {
try{
    //halt untill there are enough packs to handle an order. 
    this.notFullBuffer.acquire();
    
    //Lock to signify the start of the critical section.
    this.mutexBuffer.lock();
    
    //Insert the order in the buffer depending on prime status. 
    if (order.isPrime()) {
        //prime order, insert behind all prime orders in buffer. 
        
        //Enumerate all non prime orders in the list. 
        for (int i = inPrime; i < sizeOrderList - 1; i++) {
            //Move the non prime order back 1 position in the list. 
            buffer[i + 1] = buffer[i]; 
        }
        
        // Insert the prime order. 
        buffer[inPrime++] = order;
    
    } else {
        //No prime order, insert behind all orders in buffer.
        buffer[inPrime + inNormal++] = order;
    }
    //Notify the DispatchWorkers that a new order has been placed. 
    this.notEmptyBuffer.release();
    
    //Catch any InterruptException that might occure. 
    } catch(InterruptedException e){
        //Even though this isn't expected behavior, there is no reason to 
        //notify the user of this event or to preform any other action as 
        //the thread will just return to the queue before placing another 
        //error if it is still required to do so.
    } finally {
        //Unlock and finalize the critical section.
        mutexBuffer.unlock();
    }
}

The orders are consumed by workers which act as the consumer thread. The thread itself contains very simple code looping until all orders have been processed. In this loop a different function handleOrder(); is called on the same warehouse object which handles a single order from the buffer. It does so with the following code:

public void handleOrder(){
    //Create a variable to store the order being handled.
    Order toHandle = null;
    
    try{
        //wait until there is an order to handle. 
        this.notEmptyBuffer.acquire();

        //Lock to signify the start of the critical section.
        this.mutexBuffer.lock();

        //obtain the first order to handle as the first element of the buffer
        toHandle = buffer[0];

        //move all buffer elementst back by 1 position. 
        for(int i = 1; i < sizeOrderList; i++){
            buffer[i - 1] = buffer[i];
        }
        //set the last element in the buffer to null
        buffer[sizeOrderList - 1] = null;

        //We have obtained an order from the buffer and now we can handle it. 
        if(toHandle != null) {
            int nPacks = toHandle.getnPacks();
            
            //wait until the appropriate resources are available. 
            this.hasBoxes.acquire(nPacks);
            this.hasTape.acquire(nPacks * 50);

            //Now we can handle the order (Simulated by sleeping. Although 
            //in real live Amazon workers also have about 5ms of time per 
            //package).                 
            Thread.sleep(5 * nPacks);
            
            //Calculate the total time this order took.
            long time = System.currentTimeMillis() - 
                        toHandle.getOrderTime();
            
            //Update the total waiting time for the buyer.
            toHandle.getBuyer().setWaitingTime(time + 
                            toHandle.getBuyer().getWaitingTime());   
            
            //Check if the order to handle is prime or not.
            if(toHandle.isPrime()) {
                //Decrement the position of which prime orders are 
                //inserted into the buffer.
                inPrime--;
            } else {
                //Decrement the position of which normal orders are 
                //inserted into the buffer.
                inNormal--;
            }

            //Print a message informing the user a new order was completed. 
            System.out.println("An order has been completed for: " 
                                + toHandle.getBuyer().getName());

            //Notify the buyer he has sucsessfully ordered a new package. 
            toHandle.getBuyer().setPacksBought(
                    toHandle.getBuyer().getPacksBought() + 1);
        }else {
            //Notify the user there was a critical error obtaining the 
            //error to handle. (There shouldn't exist a case where this 
            //should happen but you never know.)
            System.err.println("Something went wrong obtaining an order.");
        }
        
        //Notify the buyers that a new spot has been opened in the buffer. 
        this.notFullBuffer.release();
        
    //Catch any interrupt exceptions.
    } catch(InterruptedException e){
        //This is expected behavior as it allows us to force the thread to 
        //revaluate it's main running loop when notifying it to finish 
        //execution.
    } finally {
        //Check if the current thread is locking the buffer lock. This is 
        //done as in the case of an interrupt we don't want to execute this 
        //code if the thread interrupted doesn't hold the lock as that 
        //would result in an exception we don't want.
        if (mutexBuffer.isHeldByCurrentThread())
            //Unlock the buffer lock.
            mutexBuffer.unlock();
    }
} 

The problem:

To verify the functionallity of the program I use the output from the statement:

System.out.println("An order has been completed for: " 
                                    + toHandle.getBuyer().getName());

from the handleOrder(); function. I place the whole output in a text file, remove all the lines which aren't added by this println(); statement and count the number of lines to know how many orders have been handled. I expect this value to be equal to the amount of threads times 10, however this is often not the case. Running tests I've noticed sometimes it does work and there are no problems but sometimes one or more buyer threads take more orders than they should. with 5 buyer threads there should be 50 outputs but I get anywhere from 50 to 60 lines (orders places).

Turning the amount of threads up to 30 increases the problem and now I can expect an increase of up to 50% more orders with some threads placing up to 30 orders.

Doing some research this is called a data-race and is caused by 2 threads accessing the same data at the same time while 1 of them writes to the data. This basically changes the data such that the other thread isn't working with the same data it expects to be working with.

My attempt:

I firmly believe ReentrantLocks are designed to handle situations like this as they should stop any thread from entering a section of code if another thread hasn't left it. Both the placeOrder(Order order); and handleOrder(); function make use of this mechanic. I'm therefor assuming I didn't implement this correctly. Here is a version of the project which is compileable and executable from a single file called Test.java. Would anyone be able to take a look at that or the code explained above and tell me what I'm doing wrong?

EDIT

I noticed there was a way a buyer could place more than 10 orders so I changed the code to:

/*
 * The run method which is ran once the thread is started. 
 */
public void run() {
    //Run until the thread has bought 10 packages, this ensures the thread 
    //will eventually stop execution automatically.
    for(packsBought = 0; packsBought < 10; packsBought++)
    {
        try {
            //Sleep for a random amount of time between 1 and 50 
            //milliseconds.
            Thread.sleep(this.rand.nextInt(49) + 1);
            //Catch any interruptExceptions.
        } catch (InterruptedException ex) {
            //There is no problem if this exception is thrown, the thread 
            //will just make an order earlier than planned. that being said 
            //there should be no manner in which this exception is thrown.
        }
        
        //Create a new order.
        Order order = new Order(this.rand.nextInt(3)+ 1, 
                                this, 
                                this.isPrime);
        
        //Set the time at which the order was placed as now.
        order.setOrderTime(System.currentTimeMillis());
        
        //place the newly created order in the warehouse.
        this.warehouse.placeOrder(order);
    }
    
    //Notify the thread has finished execution. 
    System.out.println("Thread: " + super.getName() + " has finished.");
}

in the buyers run(); function yet I'm still getting some threads which place over 10 orders. I also removed the update of the amount of packs bought in the handleOrder(); function as that is now unnecessary. here is an updated version of Test.java (where all classes are together for easy execution) There seems to be a different problem here.

BRHSM
  • 854
  • 3
  • 13
  • 48
  • I just run your code from the last edit, and it generates less packets. With 5 threads I always get 49 lines, and with 100 I get about 998 – areus Jan 18 '21 at 08:03
  • @areus yes that's working for me too, I'm hard limiting each thread to 10 orders so that should be fixed, the problem now is that some threads seem to get more than 10 orders in the output and some don't get any, you can verify this by checking the average time which for some are 0MS (at least for me)... – BRHSM Jan 18 '21 at 08:28

3 Answers3

2

There are some concurrency issues with the code, but the main bug is not related to them: it's in the block starting in line 512 on placeOrder

            //Enumerate all non prime orders in the list. 
            for (int i = inPrime; i < sizeOrderList - 1; i++) {
                //Move the non prime order back 1 position in the list. 
                buffer[i + 1] = buffer[i]; 
            }

when there is only one normal order in the buffer, then inPrime value is 0, inNormal is 1, buffer[0] is the normal order and the rest of the buffer is null.

The code to move non primer orders, starts in index 0, and then does:

buffer[1] = buffer[0]  //normal order in 0 get copied to 1
buffer[2] = buffer[1]  //now its in 1, so it gets copied to 2
buffer[3] = buffer[2]  //now its in 2 too, so it gets copied to 3
....

so it moves the normal order to buffer[1] but then it copies the contents filling all the buffer with that order.

To solve it you should copy the array in reverse order:

            //Enumerate all non prime orders in the list.
            for (int i = (sizeOrderList-1); i > inPrime; i--) {
                //Move the non prime order back 1 position in the list.
                buffer[i] = buffer[i-1];
            }

As for the concurrency issues:

  • If you check a field on a thread, updated by another thread you should declare it as volatile. Thats the case of the run field in DispatcherWorker and ResourceSupplier. See: https://stackoverflow.com/a/8063587/11751648
  • You start interrupting the dispatcher threads (line 183) while they are still processing packages. So if they are stopped at 573, 574 or 579, they will throw an InterruptedException and not finish the processing (hence in the last code not always all packages are delivered). You could avoid this by checking that the buffer is empty before start interrupting dispatcher threads, calling warehouse.notFullBuffer.acquire(warehouse.sizeOrderList); on 175
  • When catching InterruptedException you should always call Thread.currentThread().interrupt(); the preserve the interrupted status of the Thread. See: https://stackoverflow.com/a/3976377/11751648
areus
  • 2,880
  • 2
  • 7
  • 17
  • 1
    that's it! thanks a lot. I'll implement the concurrency changes too but indeed the main problem with the output being weird is fixed now! – BRHSM Jan 18 '21 at 10:46
1

I believe you may be chasing ghosts. I'm not entirely sure why you're seeing more outputs than you're expecting, but the number of orders placed appears to be in order. Allow me to clarify:

I've added a Map<String,Integer> to the Warehouse class to map how many orders each thread places:

private Map<String,Integer> ordersPlaced = new TreeMap<>();


// Code omitted for brevity

public void placeOrder(Order order)
{
  try
  {
    //halt untill there are enough packs to handle an order.
    this.notFullBuffer.acquire();

    //Lock to signify the start of the critical section.
    this.mutexBuffer.lock();

    ordersPlaced.merge(Thread.currentThread().getName(), 1, Integer::sum);

    // Rest of method
}

I then added a for-loop to the main method to execute the code 100 times, and added the following code to the end of each iteration:

warehouse.ordersPlaced.forEach((thread, orders) -> System.out.printf("  %s - %d%n", thread, orders));

I placed a breakpoint inside the lambda expression, with condition orders != 10. This condition never triggered in the 100+ runs I executed. As far as I can tell, your code is working as intended. I've increased both nWorkers and nBuyers to 100 just to be sure.

I believe you're using ReentrantLock correctly, and I agree that it is probably the best choice for your use case.

Jeroen Steenbeeke
  • 3,884
  • 5
  • 17
  • 26
  • I've recreated the same test myself and I must say it's a very interesting way to verify this, thanks! That being said it makes me more confused. it would almost mean that the toHandle variable would be in race condition even though the lock should prevent this. How would the print statement and avarage calculation end up with these differences otherwise? this is something I can't understand but thanks for the help so far! – BRHSM Jan 14 '21 at 19:46
  • I don't see how `toHandle` can be in a race condition. The only two methods that access it only do so after acquiring the lock. It might be that your buffer array isn't being filled properly. I would suggest replacing that with either a List, or better yet, a Deque: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Deque.html (in your case probably LinkedBlockingDeque, since you want to limit the max amount) – Jeroen Steenbeeke Jan 15 '21 at 06:14
  • @JeroenSteenebeeke Would that work with the way I have to insert things: Prime orders before normal orders and both in FIFO? The reason I did it using the array is because that allows me to insert in any position and therefor also at the end of all prime orders or at the end of all (prime + normal) orders... – BRHSM Jan 15 '21 at 07:42
  • No, that would not be a suitable solution. You'd probably be better off using a PriorityQueue in that case, but the standard implementation is unbounded. – Jeroen Steenbeeke Jan 15 '21 at 07:54
  • @jeroenSteenebeeke we have to use the semaphores for the exercise so I can't use a premade queue... – BRHSM Jan 15 '21 at 09:36
0

referring at your code on pastebin THE GENERIC PROBLEM: In the function public void handleOrder() he sleep (line 582) Thread.sleep(5 * nPacks); is inside the lock(): unlock(): block. With this position of sleep, it has no sense to have many DispatchWorker because n-1 will wait at line 559 this.mutexBuffer.lock() while one is sleeping at line 582.

THE BUG: The bug is in line 173. You should remove it. In your main() you join all buyers and this is correct. Then you try to stop the workers. The workers at this time are already running to complete orders that will be completed seconds after. You should only set worker.runThread(false); and then join the thead (possibly in two separate loops). This solution really waits for workers to complete orders. Interrupting the thread that is sleeping at line 582 will raise an InterruptedException and the following lines are skipped, in particular line 596 or 600 that update inPrime and in Normal counters generating unpredictable behaviours.

moving line 582 after line 633 and removing line 173 will solve the problem

HOW TO TEST: My suggestion is to introduce a counter of all Packs boxes generated by supplier and a counter of all boxes ordered and finally check if generated boxes are equals at ordered plus that left in the whorehouse.

Andrea
  • 306
  • 2
  • 8
  • Unfortunately this did not fix the problem I've been having with the output not matching with the amount of orders each thread does (check EDIT + Jeroens answer!) Also I had problems before where if I didn't interrupt the threads they would never stop (that's why that's there). I think it's because they wait at the semaphore for orders while the queue is empty. this prevents them from stopping obviously. – BRHSM Jan 15 '21 at 17:16
  • @BRHSM ok. it could be there are some other fix to do but I think that the two change I suggested will improve the overall situation. If you agree, please post an updated version . There are many other improvement the can be done but I also think that your code is very interesting and can be fixed with few little changes. – Andrea Jan 15 '21 at 18:22
  • thanks for the help so far. I'll try to play with it but as I said I'm having trouble removing the interrupt from the threads. If you know a way to fix the output problem though that would be helpfull! – BRHSM Jan 17 '21 at 09:08
  • @BRHSM with the fix I suggested your code run fine and it stops with no problem. For me It is solved. Try yourself https://pastebin.com/hkZwr89C – Andrea Jan 17 '21 at 10:09