6

I have a queue of tasks that operate on a collection of objects (let's say the objects are entries in an address book, for the sake of example).

An example task might be "Update Joe's phone number to 888-555-1212".

It's possible to have multiple "Update Joe's phone number..." tasks in the queue simultaneously, but with different phone numbers. In this case, the updates must be applied in order to ensure the state is correct at the end (and no, for the sake of argument, it is not possible to put timestamps on the tasks and timestamps on the address book entries and throw away stale tasks).

It is safe to apply an update for Jane out-of-order with an update for Joe.

I would like to multithread processing of the queue, but I need to synchronize access by person.

Is there a handy library for this kind of thing? Or am I relegated to using an Executor and doing my own synchronization on "name" in the Runnable's run() method?

Jared
  • 25,520
  • 24
  • 79
  • 114
  • I think it would be better if you put some pseudo code to make it easier to understand. But, Is it a possibility?: to use multiple executors, like one executor per person. I said that based on `It is safe to apply an update for Jane out-of-order with an update for Joe` – Jose Renato Aug 20 '13 at 19:10
  • 2
    Note that synchronizing on name inside the runnable will not guarantee sequential execution. An executor does not promise that tasks will be executed in the order of submission (unless the executor is single threaded). – Aurand Aug 20 '13 at 19:25
  • @Jose Renato: I would use one Thread per name, see my answer below. Each thread ensures order consistency. – Christian Fries Aug 20 '13 at 20:01
  • @Aurand: A PriorityBlockingQueue can be used to ensure this. – Christian Fries Aug 20 '13 at 20:08
  • 1
    One might also de-duplicate updates during insertion, if the length of the time to search the queue is significantly less than the cost of the transaction. On insert, if another task for `update`/`Joe`/`phone-number` exists in queue, destroy the earlier task. – BRPocock Aug 21 '13 at 17:33
  • Combine all such tasks into one. – S.D. Aug 24 '13 at 16:15

3 Answers3

3

A straightforward, but not quite perfect, solution to this problem is to maintain a set of sub queues in an array equal to the number of processing threads you are running. A single master thread pulls items off of your single main queue and adds them to the sub queue indexed via the modulo of the object key's hashCode (the hashCode of whatever identifies and relates your tasks).

E.g.

int queueIndex = myEntity.getKey().hashCode() % queues.length;

Only one thread processes that queue, and all tasks for the same entity will be submitted to that queue, so there will be no race conditions.

This solution is imperfect since some threads may end up with larger queues than others. Practically, this is unlikely to matter but it is something to consider.

Issues with simple solution:

The simpler solution of pulling items off of a single queue and then locking on something distinct for the affected entity has a race condition (as Aurand pointed out). Given:

Master Queue [ Task1(entity1), Task2(entity1), ... ]

Where task1 and task2 both edit the same entity entity1, and there is thread1 and thread2 operating on the queue, then the expected / desired sequence of events is:

  • Thread1 takes task1
  • Thread1 locks on entity1
  • Thread1 edits entity1
  • Thread1 unlocks entity1
  • Thread2 takes task2
  • Thread2 locks entity1
  • Thread2 edits entity1
  • Thread2 unlocks entity1

Unfortunately, even if the lock is the first statement of the thread's run method, it is possible for the following sequence to occur:

  • Thread1 takes task1
  • Thread2 takes task2
  • Thread2 locks entity1
  • Thread2 edits entity1
  • Thread2 unlocks entity1
  • Thread1 locks entity1
  • Thread1 edits entity1
  • Thread1 unlocks entity1

To avoid this, each thread would would have to lock on something (say the queue) before taking a task from the queue, and then acquire a lock on the entity while still holding the parent lock. However, you do not want to block everything while holding this parent lock and waiting to acquire the entity lock, so you need to only try for the entity lock and then handle the case when it fails to acquire (put it into another queue perhaps). Overall the situation becomes non-trivial.

Trevor Freeman
  • 7,112
  • 2
  • 21
  • 40
  • Your answer appears to be similar to mine. Your problem with the "taking of task and locking" can be improved by having the main thread (a single thread) distributing the task to the worker threads (editing entities) instead of worker threads pulling tasks from the queue concurrently. - This is what I meant in my answer, don't know why that triggered down votes. Sorry if it was a stupid idea. – Christian Fries Aug 23 '13 at 20:13
  • @ChristianFries I did not down vote your answer, so I cannot comment on what triggered any down votes. My comment about "taking the task and then locking" is in regards to the original poster's (question asker's) comment about locking on `name` after taking the object from the queue. Doing that has the subtle race condition pointed out by Aurand and elaborated on in my answer. – Trevor Freeman Aug 23 '13 at 20:58
  • Sorry, I did not realized that the question original explicitly brought up the idea of synchronizing on the entity within the runnable. Now I see why you brought that (unnecessarily faulty) solution up. (Unfortunately I cannot revert my voting unless your post is edited, but I will do so even if you change just a punctuation). W.r.t. to that synchronization, I avoid it in my solution by distributing the task in the main thread (syncrhonizing on the respective queues). Thus, each worker thread only sees the stuff he has to and the main threads ensures order consistency. – Christian Fries Aug 24 '13 at 15:25
0

Such conflicts are always resolved by assigning a version to each object. On every update version is incremented. So if one update comes in the wrong time it can be dismissed or delayed. Any way you should have a way to decide which update is the first and which is second. This method is called optimistic locking.

Mikhail
  • 4,175
  • 15
  • 31
-1

One Possible Solution

Assume a task is described by some class

class Task {
  Integer taskGroup;
  // other
}

where taskGroup is an ID which identifies tasks which have to be processes in the order of arrival (in your example, each "Name" could define its own taskGroup - or more generally - tasks w.r.t. the same name belong to the same taskGroup).

Let mainTaskQueue denote a List of Task objects. Then

  • Create a Map<Integer,List<Task>>, say taskGroupsQueues
  • For each taskGroup create a thread which operates on taskGroupsQueues.get(taskGroup) sequentially.
  • A main threads removes a task from your main task lists mainTaskQueue and appends it to taskGroups.get(task.taskGroup)
  • Moving tasks from the main queue to the single queues and fetching from the single queues has to be synchronized.

In other words: tasks which belong to the same name are executed on the same thread.

Note that if the main thread performs the distribution of the tasks, then he may also perform some kind of load balancing, i.e., if a task is not forced to a specific queue due to order consistency, that task should go to the shortes queue. Howerver, it is inherent in your problem that it may become single threaded - namely when you have only tasks beloging to the same taskGroup (in your case name).

Another Possible Solution (not tested, just a suggestion)

As pointed out in increment1s post and Aurands comment, the synchronization on the taskGroup (name) within the tread has some problems. Basically: it is too late, because the executor may have started two threads who try to sync on the same name. However, you may try to ensure order of execution at the executor level. See for example this post: Java Executors: how can I set task priority? (which references a PriorityBlockingQueue passed to the executor).

Community
  • 1
  • 1
Christian Fries
  • 16,175
  • 10
  • 56
  • 67
  • If I understand your answer correctly, then it implies that for 1000 entries in an address book, you would have 1000 threads always running, and also 1000 queues (one for each thread). This may work for the specific question if it is limited to a small number of users, but otherwise it seems like it has scalability concerns. Furthermore, your map of "queues" should probably be a concurrent map of actual blocking queues and not a map of lists. – Trevor Freeman Aug 20 '13 at 20:10
  • This is a good point. A simple modification is use a fixed number of threads and maintain a mapping from taskGroup to threadID. However, a more optimal solution - and even simple - would be to use a PriorityBlockingQueue. – Christian Fries Aug 20 '13 at 20:39
  • 2
    You are making the assumption that the order elements are retrieved from the queue is the order that they are executed in. That is invalid. – Aurand Aug 20 '13 at 21:34
  • I don't get the point. If it was w.r.t. the mentioning the possibility to use a PriorityBlockingQueue, I removed that. In the above, each treads works on its own fifo job queue. Only assure that "same names get on the same queue" - which is what he like to have. – Christian Fries Aug 21 '13 at 21:24
  • @Auran: I am not making that assumption. For the working threads (which may be submitted to an executor), order of execution between the threads does not matter. Each threads works on a queue in the order of its elements. – Christian Fries Aug 25 '13 at 07:08