15

I have a flow of units of work, lets call them "Work Items" that are processed sequentially (for now). I'd like to speed up processing by doing the work multithreaded.

Constraint: Those work items come in a specific order, during processing the order is not relevant - but once processing is finished the order must be restored.

Something like this:

   |.|
   |.|
   |4|
   |3|
   |2|    <- incoming queue
   |1|
  / | \
 2  1  3  <- worker threads
  \ | /
   |3|
   |2|    <- outgoing queue
   |1|

I would like to solve this problem in Java, preferably without Executor Services, Futures, etc., but with basic concurrency methods like wait(), notify(), etc.

Reason is: My Work Items are very small and fine grained, they finish processing in about 0.2 milliseconds each. So I fear using stuff from java.util.concurrent.* might introduce way to much overhead and slow my code down.

The examples I found so far all preserve the order during processing (which is irrelevant in my case) and didn't care about order after processing (which is crucial in my case).

Frizz
  • 2,524
  • 6
  • 31
  • 45
  • How about: thread A takes items from incoming queue, creates a worker thread, starts it and pushes it into a queue. Thread B takes the threads from that queue one by one, waits for each to complete, and then pushes the result into the outgoing queue? Does that satisfy your constraints? – RealSkeptic Apr 05 '16 at 18:30
  • 2
    The best of the solutions here all use at least some element of `java.util.concurrent` and there's a reason why. You may want to consider using it, either yourself or via one of these solutions, and see if the performance is acceptable before you reject `java.util.concurrent` on the basis of overhead. Try to avoid premature optimization and see if the easy way works fast enough first. – sparc_spread Apr 08 '16 at 13:21
  • 1
    Echoing @sparc_spread , you may find the concurrent library isn't as bad as you think. The real overhead is thread creation, with a fixed thread pool your overhead will grow in constant time with respect to the number of work items you have (you incur the same overhead if you have 5 work items or 5k work items). If you really really want an answer without the concurrent library then comment on this and I'll write you one. – kag0 Apr 13 '16 at 17:23

9 Answers9

6

This is how I solved your problem in a previous project (but with java.util.concurrent):

(1) WorkItem class does the actual work/processing:

public class WorkItem implements Callable<WorkItem> {
    Object content;
    public WorkItem(Object content) {
        super();
        this.content = content;
    }

    public WorkItem call() throws Exception {
        // getContent() + do your processing
        return this;
    }
}

(2) This class puts Work Items in a queue and initiates processing:

public class Producer {
    ...
    public Producer() {
        super();
        workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE);
        completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE));
        workerThread = new Thread(new Worker(workerQueue));
        workerThread.start();
    }

    public void send(Object o) throws Exception {
        WorkItem workItem = new WorkItem(o);
        Future<WorkItem> future = completionService.submit(workItem);
        workerQueue.put(future);
    }
}

(3) Once processing is finished the Work Items are dequeued here:

public class Worker implements Runnable {
    private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null;

    public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) {
        super();
        this.workerQueue = workerQueue;
    }

    public void run() {
        while (true) {
            Future<WorkItem> fwi = workerQueue.take(); // deqeueue it
            fwi.get(); // wait for it till it has finished processing
        }
    }
}

(4) This is how you would use the stuff in your code and submit new work:

public class MainApp {
    public static void main(String[] args) throws Exception {
        Producer p = new Producer();
        for (int i = 0; i < 10000; i++)
            p.send(i);
    }
}
Rollergirl
  • 126
  • 4
4

If you allow BlockingQueue, why would you ignore the rest of the concurrency utils in java? You could use e.g. Stream (if you have java 1.8) for the above:

List<Type> data = ...;
List<Other> out = data.parallelStream()
    .map(t -> doSomeWork(t))
    .collect(Collectors.toList());

Because you started from an ordered Collection (List), and collect also to a List, you will have results in the same order as the input.

Krzysztof Krasoń
  • 26,515
  • 16
  • 89
  • 115
  • This wouldn't solve my problem, because I don't have a fixed list of input data - I have a stream (queue). I also don't like using Streams here, because it doesn't give me control (e.g. about the number of threads that are created). – Frizz Apr 06 '16 at 16:47
  • 1
    You have control over number of threads - you jsut have to use ForkJoinPool for that: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html – Petro Semeniuk Apr 08 '16 at 11:24
4

Just ID each of the objects for processing, create a proxy which would accept done work and allow to return it only when the ID pushed was sequential. A sample code below. Note how simple it is, utilizing an unsynchronized auto-sorting collection and just 2 simple methods as API.

public class SequentialPushingProxy {

    static class OrderedJob implements Comparable<OrderedJob>{
        static AtomicInteger idSource = new AtomicInteger();
        int id;

        public OrderedJob() {
            id = idSource.incrementAndGet();
        }

        public int getId() {
            return id;
        }

        @Override
        public int compareTo(OrderedJob o) {
            return Integer.compare(id, o.getId());
        }
    }

    int lastId = OrderedJob.idSource.get();

    public Queue<OrderedJob> queue;

    public SequentialPushingProxy() {
        queue = new PriorityQueue<OrderedJob>();
    }

    public synchronized void pushResult(OrderedJob job) {
        queue.add(job);
    }

    List<OrderedJob> jobsToReturn = new ArrayList<OrderedJob>();
    public synchronized List<OrderedJob> getFinishedJobs() {
        while (queue.peek() != null) {
            // only one consumer at a time, will be safe
            if (queue.peek().getId() == lastId+1) {
                jobsToReturn.add(queue.poll());
                lastId++;
            } else {
                break;
            }
        }
        if (jobsToReturn.size() != 0) {
            List<OrderedJob> toRet = jobsToReturn;
            jobsToReturn = new ArrayList<OrderedJob>();
            return toRet;
        }
        return Collections.emptyList();
    }

    public static void main(String[] args) {
        final SequentialPushingProxy proxy = new SequentialPushingProxy();

        int numProducerThreads = 5;

        for (int i=0; i<numProducerThreads; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while(true) {
                        proxy.pushResult(new OrderedJob());
                    }
                }
            }).start();
        }


        int numConsumerThreads = 1;

        for (int i=0; i<numConsumerThreads; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while(true) {
                        List<OrderedJob> ret = proxy.getFinishedJobs();
                        System.out.println("got "+ret.size()+" finished jobs");
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }


        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.exit(0);
    }

}

This code could be easily improved to

  • allow pushing more than one job result at once, to reduce the synchronization costs
  • introduce a limit to returned collection to get done jobs in smaller chunks
  • extract an interface for those 2 public methods and switch implementations to perform tests
Dariusz
  • 21,561
  • 9
  • 74
  • 114
  • There is only one consumer on the priority Queue, but multiple threads pushing values into it. From the docs: "Multiple threads should not access a PriorityQueue instance concurrently if any of the threads modifies the queue. Instead, use the thread-safe PriorityBlockingQueue class." – Bampfer Apr 08 '16 at 14:04
  • 1
    Note the `synchronized` on proxy's methods. It's safe. – Dariusz Apr 08 '16 at 14:23
  • Good point. Then you don't need the comment "only one consumer, should be safe". :-) – Bampfer Apr 08 '16 at 22:21
3

Pump all your Futures through a BlockingQueue. Here's all the code you need:

public class SequentialProcessor implements Consumer<Task> {
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final BlockingDeque<Future<Result>> queue = new LinkedBlockingDeque<>();

    public SequentialProcessor(Consumer<Result> listener) {
        new Thread(() -> {
            while (true) {
                try {
                    listener.accept(queue.take().get());
                } catch (InterruptedException | ExecutionException e) {
                    // handle the exception however you want, perhaps just logging it
                }
            }
        }).start();
    }

    public void accept(Task task) {
        queue.add(executor.submit(callableFromTask(task)));
    }

    private Callable<Result> callableFromTask(Task task) {
        return <how to create a Result from a Task>; // implement this however
    }
}

Then to use, create a SequentialProcessor (once):

SequentialProcessor processor = new SequentialProcessor(whatToDoWithResults);

and pump tasks to it:

Stream<Task> tasks; // given this

tasks.forEach(processor); // simply this

I created the callableFromTask() method for illustration, but you can dispense with it if getting a Result from a Task is simple by using a lambda instead or method reference instead.

For example, if Task had a getResult() method, do this:

queue.add(executor.submit(task::getResult));

or if you need an expression (lambda):

queue.add(executor.submit(() -> task.getValue() + "foo")); // or whatever
Bohemian
  • 412,405
  • 93
  • 575
  • 722
  • This wouldn't solve my problem, because I don't have a fixed list of input data - I have a stream of Work Items. – Frizz Apr 08 '16 at 12:56
  • @Frizz I have changed my answer to handle a continuous stream of tasks. You haven't specified types for your situation, so I used `Task` and `Result` – Bohemian Apr 08 '16 at 19:47
3

You could have 3 input and 3 output queues - one of each type for each worker thread.

Now when you want to insert something into the input queue you put it into only one of the 3 input queues. You change the input queues in a round robin fashion. The same applies to the output, when you want to take something from the output you choose the first of the output queues and once you get your element you switch to the next queue.

All the queues need to be blocking.

ciamej
  • 6,918
  • 2
  • 29
  • 39
1

Reactive programming could help. During my brief experience with RxJava I found it to be intuitive and easy to work with than core language features like Future etc. Your mileage may vary. Here are some helpful starting points https://www.youtube.com/watch?v=_t06LRX0DV0

The attached example also shows how this could be done. In the example below we have Packet's which need to be processed. They are taken through a simple trasnformation and fnally merged into one list. The output appended to this message shows that the Packets are received and transformed at different points in time but in the end they are output in the order they have been received

import static java.time.Instant.now;
import static rx.schedulers.Schedulers.io;

import java.time.Instant;
import java.util.List;
import java.util.Random;

import rx.Observable;
import rx.Subscriber;

public class RxApp {

  public static void main(String... args) throws InterruptedException {

    List<ProcessedPacket> processedPackets = Observable.range(0, 10) //
        .flatMap(i -> {
          return getPacket(i).subscribeOn(io());
        }) //
        .map(Packet::transform) //
        .toSortedList() //
        .toBlocking() //
        .single();

    System.out.println("===== RESULTS =====");
    processedPackets.stream().forEach(System.out::println);
  }

  static Observable<Packet> getPacket(Integer i) {
    return Observable.create((Subscriber<? super Packet> s) -> {
      // simulate latency
      try {
        Thread.sleep(new Random().nextInt(5000));
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println("packet requested for " + i);
      s.onNext(new Packet(i.toString(), now()));
      s.onCompleted();
    });
  }

}


class Packet {
  String aString;
  Instant createdOn;

  public Packet(String aString, Instant time) {
    this.aString = aString;
    this.createdOn = time;
  }

  public ProcessedPacket transform() {
    System.out.println("                          Packet being transformed " + aString);
    try {
      Thread.sleep(new Random().nextInt(5000));
    } catch (Exception e) {
      e.printStackTrace();
    }
    ProcessedPacket newPacket = new ProcessedPacket(this, now());
    return newPacket;
  }

  @Override
  public String toString() {
    return "Packet [aString=" + aString + ", createdOn=" + createdOn + "]";
  }
}


class ProcessedPacket implements Comparable<ProcessedPacket> {
  Packet p;
  Instant processedOn;

  public ProcessedPacket(Packet p, Instant now) {
    this.p = p;
    this.processedOn = now;
  }

  @Override
  public int compareTo(ProcessedPacket o) {
    return p.createdOn.compareTo(o.p.createdOn);
  }

  @Override
  public String toString() {
    return "ProcessedPacket [p=" + p + ", processedOn=" + processedOn + "]";
  }

}

Deconstruction

Observable.range(0, 10) //
    .flatMap(i -> {
      return getPacket(i).subscribeOn(io());
    }) // source the input as observables on multiple threads


    .map(Packet::transform) // processing the input data 

    .toSortedList() // sorting to sequence the processed inputs; 
    .toBlocking() //
    .single();

On one particular run Packets were received in the order 2,6,0,1,8,7,5,9,4,3 and processed in order 2,6,0,1,3,4,5,7,8,9 on different threads

packet requested for 2
                          Packet being transformed 2
packet requested for 6
                          Packet being transformed 6
packet requested for 0
packet requested for 1
                          Packet being transformed 0
packet requested for 8
packet requested for 7
packet requested for 5
packet requested for 9
                          Packet being transformed 1
packet requested for 4
packet requested for 3
                          Packet being transformed 3
                          Packet being transformed 4
                          Packet being transformed 5
                          Packet being transformed 7
                          Packet being transformed 8
                          Packet being transformed 9
===== RESULTS =====
ProcessedPacket [p=Packet [aString=2, createdOn=2016-04-14T13:48:52.060Z], processedOn=2016-04-14T13:48:53.247Z]
ProcessedPacket [p=Packet [aString=6, createdOn=2016-04-14T13:48:52.130Z], processedOn=2016-04-14T13:48:54.208Z]
ProcessedPacket [p=Packet [aString=0, createdOn=2016-04-14T13:48:53.989Z], processedOn=2016-04-14T13:48:55.786Z]
ProcessedPacket [p=Packet [aString=1, createdOn=2016-04-14T13:48:54.109Z], processedOn=2016-04-14T13:48:57.877Z]
ProcessedPacket [p=Packet [aString=8, createdOn=2016-04-14T13:48:54.418Z], processedOn=2016-04-14T13:49:14.108Z]
ProcessedPacket [p=Packet [aString=7, createdOn=2016-04-14T13:48:54.600Z], processedOn=2016-04-14T13:49:11.338Z]
ProcessedPacket [p=Packet [aString=5, createdOn=2016-04-14T13:48:54.705Z], processedOn=2016-04-14T13:49:06.711Z]
ProcessedPacket [p=Packet [aString=9, createdOn=2016-04-14T13:48:55.227Z], processedOn=2016-04-14T13:49:16.927Z]
ProcessedPacket [p=Packet [aString=4, createdOn=2016-04-14T13:48:56.381Z], processedOn=2016-04-14T13:49:02.161Z]
ProcessedPacket [p=Packet [aString=3, createdOn=2016-04-14T13:48:56.566Z], processedOn=2016-04-14T13:49:00.557Z]
diduknow
  • 1,624
  • 2
  • 13
  • 10
0

You could launch a DoTask thread for every WorkItem. This thread processes the work. When the work is done, you try to post the item, synchronized on the controlling object, in which you check if it's the right ID and wait if not.

The post implementation can be something like:

synchronized(controllingObject) {
try {
while(workItem.id != nextId) controllingObject.wait();
} catch (Exception e) {}
//Post the workItem
nextId++;
object.notifyAll();
}
  • I would like to avoid repeated thread creation as much as possible and use a thread pool instead. – Frizz Apr 06 '16 at 16:49
  • If you're going to use a thread pool, you're already dipping into `java.util.concurrent`, unless you write your own or use some other 3rd party one (not sure there even are any maintained since Java 5). – sparc_spread Apr 08 '16 at 13:23
0

I think that you need an extra queue to hold the incoming order. IncomingOrderQueue.

When you consume the objects you put them in some storage, for example Map and then from another thread which consumes from the IncomingOrderQueue you pick the ids(hashes) of the objects and then you collect them from this HashMap.

This solution can easily be implemented without execution service.

Alexander Petrov
  • 9,204
  • 31
  • 70
0

Preprocess: add an order value to each item, prepare an array if it is not allocated.

Input: queue (concurrent sampling with order values 1,2,3,4 but doesnt matter which tread gets which sample)

Output: array (writing to indexed elements, using a synch point to wait for all threads in the end, doesn't need collision checks since it writes different positions for every thread)

Postprocess: convert array to a queue.

Needs n element-array for n-threads. Or some multiple of n to do postprocessing only once.

huseyin tugrul buyukisik
  • 11,469
  • 4
  • 45
  • 97