18

I have a web server service where clients request a smartcard computation and get their result. Available smartcard number can decrease or increase during the server uptime, for example I can add or remove physically the smartcard from the reader (or many other events... like exception and so on).

enter image description here

A smartcard computation can take a while, so I have to optimize these jobs to use all available smartcards if there are concurrent requests to the web server.

I thought to work with a smartcard-thread pool. The unusual thing, at least for me, is that the pool should change its size not depending on the client requests but only on the smartcard availability.

enter image description here

I studied many examples of:

  • BlockingQueue: It looks good to store request and stop thread waiting for something to do.
  • FutureTask: I can use this class to let client waits its answer, but which kind of excecutor should do the task?
  • ThreadPoolExecutor: Seems what I need, but with this I cannot change the pool size, moreover every thread should be linked to a single smartcard slot. This can be a solution if I could change the pool size (adding a thread when a smartcard is inserted and removing a thread when a smartcard is removed) and if I can assign a specific smartcard to each thread.

This is the smartcard control, I have one SmartcardWrapper per smartcard, every smartcard has its own slot number.

public class SmartcardWrapper{

    private int slot;

    public SmartcardWrapper(int slot) {
        this.slot=slot;
    }   

    public byte[] compute(byte[] input) {
        byte[] out=new byte[];
        SmartcardApi.computerInput(slot,input,out); //Native method
        return out;
    }
}

I tried to create a thread pool with one thread per smartcard:

private class SmartcardThread extends Thread{

    protected SmartcardWrapper sw;

    public SmartcardThread(SmartcardWrapper sw){
        this.sw=sw;
    }

    @Override
    public void run() {
        while(true){
            byte[] input=queue.take();
            byte output=sw.compute(input);
            // I have to return back the output to the client
        }           
    }
}

Everyone waiting for something in the same input queue:

BlockingQueue<byte[]> queue=new BlockingQueue<byte[]>();

But how to return back output from smartcard-thread to the webserver-client? This let me think that BlockingQueue is not my solution.

How to approach this problem? Which concurrency pattern should I follow? is it correct to assign one thread per smartcard or should I can simply use semaphores?

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Tobia
  • 9,165
  • 28
  • 114
  • 219
  • 1
    Are the smart cards on the server or on the client? Is the computation taking place on the server or on the client? If they are different places, can you clarify how the information is transmitted between the two? – Warren Dew Dec 07 '15 at 22:26
  • 1
    Why not put the smart cards (the resources) in a Queue? – Maarten Bodewes Dec 07 '15 at 23:49
  • 1
    @WarrenDew this is a server side computation. – Tobia Dec 08 '15 at 07:57
  • 1
    @MaartenBodewes do you mean a queue of threads? And when a thread finish its job it is queued again? – Tobia Dec 08 '15 at 07:59
  • 1
    @Tobia do you have some server API that invoke some method on increase/decrease number of smartcards available? – Archer Dec 15 '15 at 06:51
  • 1
    No, I will use a timer task to check new card availability or to remove not present smartcard. (the smartcard api has only bool method isCardIn() so I don't have a real add / remove event) – Tobia Dec 15 '15 at 06:57

5 Answers5

5

Your assumption:

ThreadPoolExecutor: Seems what I need, but with this I cannot change the pool size, moreover every thread should be linked to a single smartcard slot.

is not right.

You can set thread pool size dynamically.

Have a look at below ThreadPoolExecutor APIs

public void setMaximumPoolSize(int maximumPoolSize)

Sets the maximum allowed number of threads. This overrides any value set in the constructor. If the new value is smaller than the current value, excess existing threads will be terminated when they next become idle.

public void setCorePoolSize(int corePoolSize)

Sets the core number of threads. This overrides any value set in the constructor. If the new value is smaller than the current value, excess existing threads will be terminated when they next become idle. If larger, new threads will, if needed, be started to execute any queued tasks.

Core and maximum pool sizes:

A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by corePoolSize and maximumPoolSize.

When a new task is submitted in method execute(java.lang.Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle.

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.

By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. But I would not recommend to have those many number of threads. Set this value with caution.

Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).

EDIT:

For better utilization of thread pool, if you know the maximum number of cards are 6, you can use

 ExecutorService executor = Executors.newFixedThreadPool(6);

OR

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
  • 1
    I had the same idea! I extended a ThreadFactory to create wrapped thread with smartcard slot reference, and then I created a new ThreadPoolExecutor with this custom ThreadFactory. This factory mantains aligned maxpool, corepool with the number of available smartcards. I stopped this developement because I could not understand how to manage the thread stop, if I decrease the poolnumber due to a smartcard removal, how can I be sure that the right thread will be stoped of the removed smartcard and not other one? ThreadPoolExecutor doesn't have a removeThread(Thread t) method. – Tobia Dec 16 '15 at 08:15
  • 1
    I think you should not bother about exact thread as long as system maintains pool size. Your requirement is new card should get new thread and that requirement is met. Even if reducing the pool size does not happen immediately and if it happens at a specific time later, system health will be fine. – Ravindra babu Dec 16 '15 at 08:51
  • 1
    If I link a thread to a single smartcard I need to remove this thread of flag it as unavailable if its smartcard is removed. This is a requirement (smartcard number can *decrease* or increase) – Tobia Dec 16 '15 at 08:54
  • 1
    Personally I don't think new thread for new card is good idea. You have limited CPU cores in your server like 8,16,32 etc.What is the benefit you are getting creating 1000 threads for 1000 new cards when you have 32 CPU cores? – Ravindra babu Dec 16 '15 at 09:00
  • 1
    I think there is some API to get thread number of task being executed. I will figure out it in sometime. – Ravindra babu Dec 16 '15 at 09:03
  • But thread creation and deletion is costly affair. You lose the benefit of ThredPoolExecutor core purpose. – Ravindra babu Dec 16 '15 at 09:06
  • In my case smartcard can be up to 6, but I agree with you, this can be a thread waste. How to manage it without one-thread per smartcard? With semaphores to check smartcard availability? I need to optimase the smartcard usage in case of many request to the web-server at the same time. – Tobia Dec 16 '15 at 09:06
  • Let's have fixed thread pool of size 6 and treat addition of card as a task to thread pool executor. Does it solve your problem? – Ravindra babu Dec 16 '15 at 09:32
  • But when I have a "real" task for smartcard, I send it to the ThreadPoolExecutor... this choose the first one available, but how can I be sure that this has its smartcard inserted and ready? This is why I thought to increate threads only when the smartcard is available. – Tobia Dec 16 '15 at 09:34
  • If I understand it correctly (correct me if I am wrong), you can have two thread pools - once till Card is ready & other once card is ready. – Ravindra babu Dec 17 '15 at 06:34
  • I don't understand this, why two pools? – Tobia Dec 17 '15 at 07:11
  • You told that you will send "real" task to ThreadPoolExecutor. But you are not sure when smart card is inserted and ready. So I thought that card will be ready after sometime of insertion & job will submitted to the pool later. – Ravindra babu Dec 17 '15 at 07:39
  • I spoke of "real" task because you suggested to create a task for smartcard insertion. Actually I can understand that a smartcard is ready with a scheduled poll that checks each cards. – Tobia Dec 17 '15 at 08:21
  • If you have polling agent, one thread pool would be suffice. You can submit real task to the ThreadPoolExecutor once your smartcard is ready – Ravindra babu Dec 17 '15 at 08:54
  • Have a look at scheduledexecutorservice : http://tutorials.jenkov.com/java-util-concurrent/scheduledexecutorservice.html – Ravindra babu Dec 17 '15 at 08:58
5

Have you considered using Apache Commons Pool at all?

You need to maintain a pool of SmartcardWrapper objects where each SmartcardWrapper will represent a physical SmartCard. Whenever you need to make a new computation, you borrow the object from the pool, do the calculation and return the object in the pool so it can be reused by the next thread.

The pool itself is thread-safe and blocks when there are no available objects. All you need to do is implement an api to add/remove SmartcardWrapper Objects to the pool.

Vladimir G.
  • 418
  • 3
  • 10
3

I might have found a reasonable simple solution based on the following assumptions:

  • a separate process manages (system-event) notifications for smartcards that become available or are removed.
  • a client does not care which smartcard it gets to use, as long as it can use one without interference.

These two assumptions actually make it easier to create a pooling (shared resources) solution, since it is usually the pool itself that is responsible for creating and removing resources when appropriate. Without this functionality, a pooling solution becomes simpler. I do assume that the client that gets a smartcard from the pool to use, can execute the required smartcard functions within its own execution thread (similar to how a database connection is used from a database connection pool to query data from a database).

I have only done some minimal testing for the two classes shown below, and I'm afraid the bulk of the work is in writing (unit) tests that prove the pool works properly with concurrent client requests combined with adding and removing smartcard resources. If you do not want to do that, then the answer from user769771 is probably a better solution. But if you do, try it out, see if it fits. The idea is that only one resource-pool instance is created and used by all the clients and updated by the separate process that manages smartcard availability.

import java.util.*;
import java.util.concurrent.*;

/**
 * A resource pool that expects shared resources 
 * to be added and removed from the pool by an external process
 * (i.e. not done by the pool itself, see {@link #add(Object)} and {@link #remove(Object)}.
 * <br>A {@link ResourcePoolValidator} can optionally be used. 
 * @param <T> resource type handed out by the pool.
 */
public class ResourcePool<T> {

    private final Set<T> registered = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>()); 
    /* Use a linked list as FIFO queue for resources to lease. */
    private final List<T> available = Collections.synchronizedList(new LinkedList<T>()); 
    private final Semaphore availableLock = new Semaphore(0, true); 

    private final ResourcePoolValidator<T> validator;

    public ResourcePool() {
        this(null);
    }

    public ResourcePool(ResourcePoolValidator<T> validator) {
        super();
        this.validator = validator;
    }

    /**
     * Add a resource to the pool.
     * @return true if resource is not already in the pool.
     */
    public synchronized boolean add(T resource) {

        boolean added = false;
        if (!registered.contains(resource)) {
            registered.add(resource);
            available.add(resource);
            availableLock.release();
            added = true;
        }
        return added;
    }

    /**
     * Removes a resource from the pool.
     * The resource might be in use (see {@link #isLeased(Object)})
     * in which case {@link ResourcePoolValidator#abandoned(Object)} will be called 
     * when the resource is no longer used (i.e. released). 
     * @return true if resource was part of the pool and removed from the pool.
     */
    public synchronized boolean remove(T resource) {

        // method is synchronized to prevent multiple threads calling add and remove at the same time 
        // which could in turn bring the pool in an invalid state.
        return registered.remove(resource);
    }

    /**
     * If the given resource is (or was, see also {@link #remove(Object)} part of the pool,
     * a returned value true indicates the resource is in use / checked out.
     * <br>This is a relative expensive method, do not call it frequently.
     */
    public boolean isLeased(T resource) {
        return !available.contains(resource);
    }

    /**
     * Try to get a shared resource for usage. 
     * If a resource is acquired, it must be {@link #release(Object)}d in a finally-block.
     * @return A resource that can be exclusively used by the caller.
     * @throws InterruptedException When acquiring a resource is interrupted.
     * @throws TimeoutException When a resource is not available within the given timeout period.
     */
    public T tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException, TimeoutException {

        T resource = null;
        long timeRemaining = tunit.toMillis(timeout);
        final long tend = System.currentTimeMillis() + timeRemaining;
        do {
            if (availableLock.tryAcquire(timeRemaining, TimeUnit.MILLISECONDS)) {
                resource = available.remove(0);
                if (registered.contains(resource)) {
                    boolean valid = false;
                    try {
                        valid = (validator == null ? true : validator.isValid(resource));
                    } catch (Exception e) {
                        // TODO: log exception
                        e.printStackTrace();
                    }
                    if (valid) {
                        break; // return the "checked out" resource
                    } else {
                        // remove invalid resource from pool
                        registered.remove(resource);
                        if (validator != null) {
                            validator.abandoned(resource);
                        }
                    }
                }
                // resource was removed from pool, try acquire again
                // note that this implicitly lowers the maximum available resources
                // (an acquired permit from availableLock goes unused).
                // TODO: retry puts us at the back of availableLock queue but should put us at the front of the queue
                resource = null;
            }
            timeRemaining = tend - System.currentTimeMillis();
        } while (timeRemaining > 0L);
        if (resource == null) {
            throw new TimeoutException("Unable to acquire a resource within " + tunit.toMillis(timeout) + " ms.");
        }
        return resource;
    }

    /**
     * This method must be called by the caller / client whenever {@link #tryAcquire(long, TimeUnit)}
     * has returned a resource. If the caller has determined the resource is no longer valid,
     * the caller should call {@link #remove(Object)} before calling this method.
     * @param resource no longer used.
     */
    public void release(T resource) {

        if (resource == null) {
            return;
        }
        if (registered.contains(resource)) {
            available.add(resource);
            availableLock.release();
        } else {
            if (validator != null) {
                validator.abandoned(resource);
            }
        }
    }

    /** An array (copy) of all resources registered in the pool. */
    @SuppressWarnings("unchecked")
    public T[] getRegisteredResources() {
        return (T[]) registered.toArray(new Object[registered.size()]);
    }

}

And a separate class with functions related to the separate process that manages smarcard availability.

import java.util.concurrent.TimeUnit;

/**
 * Used by a {@link ResourcePool} to validate a resource before handing it out for lease
 * (see {@link #isValid(Object)} and signal a resource is no longer used (see {@link #abandoned(Object)}). 
 */
public class ResourcePoolValidator<T> {

    /**
     * Overload this method (this method does nothing by default) 
     * to validate a resource before handing it out for lease.
     * If this method returns false or throws an exception (which it preferably should not do), 
     * the resource is removed from the pool.
     * @return true if the resource is valid for leasing
     */
    public boolean isValid(T resource) {
        return true;
    }

    /**
     * Called by the {@link ResourcePool#release(Object)} method when a resource is released by a caller 
     * but the resource was previously removed from the pool and in use.
     * <br>Called by {@link ResourcePool#tryAcquire(long, TimeUnit)} if a resource if not valid 
     * (see {@link #isValid(Object)}.
     * <br>Overload this method (this method does nothing by default) to create a notification of an unused resource,
     * do NOT do any long period of processing as this method is called from a caller (client) thread.
     */
    public void abandoned(T resource) {
        // NO-OP
    }

}
Community
  • 1
  • 1
vanOekel
  • 6,358
  • 1
  • 21
  • 56
2

By looking at the requirements the best architecture would be to decouple the computation of the smart card from your web services.

Relying on Web Services to wait on processor intensive tasks will result in timeouts.

The best solution is pre computing smartcard using a periodic job and storing those slot, computation pairs in a Cache Server like Redis.

enter image description here

The Smart Card Synchronizer Job is a Seperate J2SE Stand Alone application which periodically checks which Smart Card is available and active (Error free) and update the Redis Cache with slot and computation as a Key/Value pair. If there is a Smart Card unavailable it will be removed from the cache.

The Web Service will just check the Redis cache for a particular slot key and if it finds a value will return it or else return a not found for that slot (Unavailable or Error)

This design is scalable in both Smart Card end as well as Client Requests end.

shazin
  • 21,379
  • 3
  • 54
  • 71
  • 1
    I cannot cache smartcard jobs because it depends on the clients input. My question is how to manage many SmartcardWrapper (each smartcard has its own wrapper linked to its slot) to serve concurrent requests from clients. – Tobia Dec 15 '15 at 07:43
2

In answer to your question about how to return the result back to the caller:

Everyone waiting for something in the same input queue:

BlockingQueue queue=new BlockingQueue();

But how to return back output from smartcard-thread to the webserver-client? This let me think that BlockingQueue is not my solution.

Your submission queue idea is mostly fine, but you also need a queue per thread to return the result to the job submitter...

Change your submission queue to:

BlockingQueue<JobSubmitRec> queue=new BlockingQueue<JobSubmitRec>();

and JobSubmitRec will have the byte[] and a single-use queue to return the result:

class JobSubmitRec
{
  byte[] data;
  BlockingQueue<JobSubmitResult> result=new LinkedBlockingQueue<JobSubmitResult>();
}

and your worker Thread runnable will look something like:

public void run() {
 while(true){
  JobSubmitRec submitrec = queue.take();
  byte[] input = submitrec.data;
  byte output = sw.compute(input);
  submitrec.result.put( new JobSubmitResult(output) );
 }           
}

and the client that submits the job will look like:

JobSubmitRec jsr = new JobSubmitRec( data );
queue.put( jsr );
JobSubmitResult result = jsr.result.take();
// use result here
Trevor Harrison
  • 1,744
  • 1
  • 14
  • 20