0

I am setting up a simulator (for testing) of a server (Radius) which sends queries to another server (LDAP) using threads. The queries need to be executed on a x per second basis. I am using a scheduled thread pool executor with callable for this purpose so that I can create callables and submit them to the thread pool for execution. Each thread should open its own connection and use it to query. The thing is that I want the connection to be re-used by the same thread every time it is used.

To clarify:

If I have lets say a thread pool of 20 I want 20 connections to be created and used. (so I can send lets say 10.000 queries which will be processed in turn by the 20 threads/connections).

Now the (LDAP) server information to connect to is sent to the constructor of the callable and the callable sets up the connection for execution. Thereafter I retrieve the result using the future system of callable. The problem with this is each time I create a callable the connection is being opened (and later closed of course).

I am looking for the best practice to keep the connections alive and them being re-used for each thread.

I have thought of some ways to implement this but they dont seem very efficient:

  • Use a connection pool inside my threadpool to retrieve a free connection when needed (Creates deadlock and other thread safety issues)
  • Use a static (or so) array with connections and using the thread number to retrieve its connection (Not foul proof either, see link)

What is the most efficient way of implementing this? <- old question, see edit for new question.

EDIT: I was thinking because I cannot safely get a thread number, but the threadId is always unique, I can just use a

map<String/threadId, connection>

And pass the whole map (reference) to the callable. This way I can use something like: (pseudo code)

Connection con = map.get(this.getThreadId());
If (con == null){
  con = new Connection(...);
  map.put(this.getThreadId(), con)
}

It would also be possible to make the map static and just access it statically. This way I don't have to pass the map to the Callable. This would be at least safe and doesnt force me to restructure my code.

New question: What would be closer aligned with best practices; The above solution or Zim-Zam's solution? And if the above is best, would it be better to go for the static solution or not?

Community
  • 1
  • 1
J Bre
  • 107
  • 1
  • 10

1 Answers1

1

I would implement this using a BlockingQueue that is shared between Callables, with the ScheduledThreadPoolExecutor putting x queries into the BlockingQueue every second

public class Worker implements Runnable {
    private final BlockingQueue<Query> inbox;
    private final BlockingQueue<Result> outbox;

    public Worker(BlockingQueue<Query> inbox, BlockingQueue<Result> outbox) {
        // create LDAP connection
        this.inbox = inbox;
        this.outbox = outbox;
    }

    public void run() {
        try {
            while(true) {
                // waits for a Query to be available
                Query query = inbox.take();
                // execute query
                outbox.add(new Result(/* result */));
            }
        } catch(InterruptedException e) {
          // log and restart? close LDAP connection and return?
        }
    }
}

public class Master {
   private final int x; // number of queries per second
   private final BlockingQueue<Query> outbox = new ArrayBlockingQueue<>(4 * x);
   private final BlockingQueue<Result> inbox = new ArrayBlockingQueue<>(4 * x);
   private final ScheduledThreadPoolExecutor executor;
   private final List<Future<?>> workers = new ArrayList<>(20);
   private final Future<?> receiver;

   public Master() {
     // initialize executor
     for(int i = 0; i < 20; i++) {
         Worker worker = new Worker(inbox, outbox);
         workers.add(executor.submit(worker));
     }

     receiver = executor.submit(new Runnable() {
         public void run() {
           while(!Thread.interrupted()) {
             try {
               Result result = inbox.take();
               // process result
             } catch(InterruptedException e) {
               return;
             }
           }
         }
     }
   }

   executor.scheduleWithFixedDelay(new Runnable() {
       public void run() {
           // add x queries to the queue
       }
   }, 0, 1, TimeUnit.SECONDS);
}

Use BlockingQueue#add to add new Queries to outbox, if this throws an exception then your queue is full and you'll need to reduce the rate of query creation and/or create more workers. To break out of a worker's infinite loop call cancel(true) on its Future, this will throw an InterruptedException inside of the Worker.

Zim-Zam O'Pootertoot
  • 17,888
  • 4
  • 41
  • 69
  • Thank you for your reply @Zim-Zam! It looks good but I need to add two things: - I use callable so the result of the search gets returned. - Because I use callable I need the callable(/runnable/worker) to finish execution so the result can be returned. In your example it will keep on running – J Bre Dec 14 '15 at 15:55
  • 1
    @JBre I've edited my answer, there are now two queues: `Master` sends `Query` objects through a `BlockingQueue` and receives `Result` objects through a `BlockingQueue`, likewise `Worker` receives `Query` objects and sends `Result` objects – Zim-Zam O'Pootertoot Dec 14 '15 at 16:34
  • Thank you for the fast update Zim-Zam! It looks great and is perfectly usable! I am certainly going to remember this one for future use! One extra comment though which is about something I have not mentioned yet; Because this is a test application which needs to execute just one test (executing the LDAP queries) I just collect the results later when all queries have been executed. It would be a bit overkill to run an extra thread which processes the results. (And maybe even slightly influence the test results since its taking up resources which will delay the other threads) – J Bre Dec 14 '15 at 17:59
  • [continuing:] So the question is now what would be best practice in this case? Thank you again for your perfectly acceptable answer and I am sorry I cannot give you credits yet by approving your answer but I am wondering what would be the best practice. I edited my answer with a proposition. Could you please take a look and let me know what you think? Your opinion is highly valued :) – J Bre Dec 14 '15 at 18:01
  • 1
    @JBre I would favor your original approach if it didn't cost much to construct a `Worker`/`Callable`, for example if you were accessing a database using a pooled connection or something along those lines. The input/output queue is more appropriate if, as in this case, it was expensive to create a `Worker`/`Callable` such that it would be appropriate to reuse them. – Zim-Zam O'Pootertoot Dec 14 '15 at 19:20
  • Ok thank you for the explanation @Zim-Zam! I will implement my solution but I will still accept your answer since it is a complete answer with a good example. Thank you for your effort! – J Bre Dec 15 '15 at 09:26