15

I need to build a pool of workers in Java where each worker has its own connected socket; when the worker thread runs, it uses the socket but keeps it open to reuse later. We decided on this approach because the overhead associated with creating, connecting, and destroying sockets on an ad-hoc basis required too much overhead, so we need a method by which a pool of workers are pre-initializaed with their socket connection, ready to take on work while keeping the socket resources safe from other threads (sockets are not thread safe), so we need something along these lines...

public class SocketTask implements Runnable {
  Socket socket;
  public SocketTask(){
    //create + connect socket here
  }

  public void run(){
    //use socket here
  }

}

On application startup, we want to initialize the workers and, hopefully, the socket connections somehow too...

MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
   pool.addWorker( new WorkerThread());

As work is requested by the application, we send tasks to the worker pool for immediate execution...

pool.queueWork( new SocketTask(..));


Updated with Working Code
Based on helpful comments from Gray and jontejj, I've got the following code working...

SocketTask

public class SocketTask implements Runnable {
    private String workDetails;
    private static final ThreadLocal<Socket> threadLocal = 
           new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue(){
            return new Socket();
        }           
    };

    public SocketTask(String details){              
        this.workDetails = details;
    }

    public void run(){      
        Socket s = getSocket(); //gets from threadlocal
        //send data on socket based on workDetails, etc.
    }

    public static Socket getSocket(){
        return threadLocal.get();
    }
}

ExecutorService

ExecutorService threadPool = 
    Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());

    int tasks = 15;  
    for( int i = 1; i <= tasks; i++){
        threadPool.execute(new SocketTask("foobar-" + i));
    }   

I like this approach for several reasons...

  • Sockets are local objects (via ThreadLocal) available to the running tasks, eliminating concurrency issues.
  • Sockets are created once and kept open, reused when new tasks get queued, eliminating socket object create/destroy overhead.
raffian
  • 31,267
  • 26
  • 103
  • 174
  • Thanks for the working code snippet, helped me a lot! I was wondering: how do you close the db connections when the thread pool is shut down? – kafman Oct 05 '15 at 20:16
  • Search for [orderly shutdown](http://stackoverflow.com/questions/3332832/graceful-shutdown-of-threads-and-executor) of execution threads, hope it helps. – raffian Oct 05 '15 at 21:43
  • I checked it out, thanks! I did not help me entirely though ... I'm looking for a way to call dbconnection.close() for all db connections stored in the ThreadLocal. I really don't know how to approach this ... I mean, the db connection isn't closed automatically once the thread pool is being shutdown, right? – kafman Oct 06 '15 at 12:19
  • I think I found another solution: I use a thread pool and a separate vector of the same size to store the db connections. Actually a ThreadLocal is not really necessary in my case, as I don't really care which Thread gets which connection, as long as it gets one. Thanks anyway for the inspiration! – kafman Oct 06 '15 at 12:58

2 Answers2

11

One idea would be to put the Sockets in a BlockingQueue. Then whenever you need a Socket your threads can take() from the queue and when they are done with the Socket they put() it back on the queue.

public void run() {
    Socket socket = socketQueue.take();
    try {
       // use the socket ...
    } finally {
       socketQueue.put(socket);
    }
}

This has the added benefits:

  • You can go back to using the ExecutorService code.
  • You can separate the socket communication from the processing of the results.
  • You don't need a 1-to-1 correspondence to processing threads and sockets. But the socket communications may be 98% of the work so maybe no gain.
  • When you are done and your ExecutorService completes, you can shutdown your sockets by just dequeueing them and closing them.

This does add the additional overhead of another BlockingQueue but if you are doing Socket communications, you won't notice it.

we don't believe ThreadFactory addresses our needs ...

I think you could make this work if you used thread-locals. Your thread factory would create a thread that first opens the socket, stores it in a thread-local, then calls the Runnable arg which does all of the work with the socket, dequeuing jobs from the ExecutorService internal queue. Once it is done the arg.run() method would finish and you could get the socket from the thread-local and close it.

Something like the following. It's a bit messy but you should get the idea.

ExecutorService threadPool =
    Executors.newFixedThreadPool(10,
      new ThreadFactory() {
        public Thread newThread(final Runnable r) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    openSocketAndStoreInThreadLocal();
                    // our tasks would then get the socket from the thread-local
                    r.run();
                    getSocketFromThreadLocalAndCloseIt();
                }
            });
            return thread;
        }
      }));

So your tasks would implement Runnable and look like:

public SocketWorker implements Runnable {
    private final ThreadLocal<Socket> threadLocal;
    public SocketWorker(ThreadLocal<Socket> threadLocal) {
       this.threadLocal = threadLocal;
    }
    public void run() {
        Socket socket = threadLocal.get();
        // use the socket ...
    }
}
Gray
  • 115,027
  • 24
  • 293
  • 354
  • 1
    No. The `ThreadFactory` is only called once per thread. So if your thread-pool has 10 threads, the socket is only created when the pool is initialized and the threads are started. Not for every task @SAFX. – Gray May 21 '13 at 22:10
  • Remember @SAFX, that the run() method in the newThread method is not your task run. It is the run method that the `ExecutorService` uses to dequeue tasks and run them. – Gray May 21 '13 at 22:12
  • Yes, somewhat @SAFX. `r.run()` is a runnable inside of the concurrent code. It dequeues your tasks from the internal task blocking queue and calls your task's `run()` (catching exceptions and the like). I'll add a sample task class to my answer. – Gray May 21 '13 at 22:25
  • 1) so you would say `executor.execute(new SocketWorker(socktThreadLocal));`. That allows you to define the thread-local in class which starts the tasks. Or you can use a `static` thread-local in @jontejj's answer. 2) whoops, that was an error. Fixed @SAFX. – Gray May 21 '13 at 22:45
  • Got it working @Gray, without passing the threadLocal to the SocketWorker, I just defined a `private static final ThreadLocal` and overrided `initialValue()` to create the socket the first time in `SocketWorker`; any tasks sent to the executor simply reuse the next available thread and its local socket, worked like a charm, thank you. – raffian May 22 '13 at 01:34
5

I think you should use a ThreadLocal

package com.stackoverflow.q16680096;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main
{
    public static void main(String[] args)
    {
        ExecutorService pool = Executors.newCachedThreadPool();
        int nrOfConcurrentUsers = 100;
        for(int i = 0; i < nrOfConcurrentUsers; i++)
        {
            pool.submit(new InitSocketTask());
        }

        // do stuff...

        pool.submit(new Task());
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class InitSocketTask implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do initial setup here
    }

}

package com.stackoverflow.q16680096;

import java.net.Socket;

public final class SocketPool
{
    private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue()
        {
            return new Socket(); // Pass in suitable arguments here...
        }
    };

    public static Socket get()
    {
        return SOCKETS.get();
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class Task implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do stuff with socket...
    }
}

Where each thread gets its own socket.

jontejj
  • 2,800
  • 1
  • 25
  • 27
  • Making the Socket ThreadLocal makes it belong to the thread and not a worker which gives you the possibility to get access to the socket directly inside your Task. Just be sure to warmup the ThreadPool with InitTasks that fires up the sockets. Then when new Tasks are executed all the Threads in the ThreadPool will have the sockets ready. – jontejj May 21 '13 at 22:04
  • I updated my original question with working code from your example. What you call `SocketPool` I named `SocketTask` in my code, but they both have a static `ThreadLocal` object for getting a local socket object, does that look valid to you? – raffian May 22 '13 at 16:39
  • 1
    WorkerThreadFactory seems redundant, what's wrong with Executors#defaultThreadFactory()? – jontejj May 23 '13 at 07:21
  • I think you're right...I took that from Gray originally who suggested custom code in `run()`, but I removed that code, so now `WorkerThreadFactory` is generic and probably not needed, thx. – raffian May 23 '13 at 13:09
  • 2
    +1 for being crazily organized, lol :) package com.stackoverflow.q16680096; – Anand Rockzz Nov 04 '15 at 20:00