2

Before we get trigger happy none of the suggested duplicates here on SO have answered my question.

Trying to do this: I have an Observer Pattern. When I want to dispatch Events I want to do it in a Java thread.

Problem

  1. I don't want to create a new thread each time. I think that would be more computationally expensive than keeping the reference to the thread.
  2. I need to pass an Object to the thread, which is the Event.

    public class EventBus implements Runnable
    {
        private Thread t1;
    
        public EventBus()
        {
            t1 = new Thread(this);
        } 
    
        public void notify(Event event)
        {
            t1.start();
        }
    
        @Override
        public void run()
        {
            for(Listener l : list)
               l.handle(event);
        }
    }
    

So how do I pass my Event object to my run() without using something like this each time?

Runnable r = new EventBus(param_value);
new Thread(r).start();

I am aware that the way to pass parameters to the Thread is to put them in the Thread constructor, I don't want that, I learned that here How can I pass a parameter to a Java Thread?.

I need to pass the new Event to the thread each time notify() is called. During the execution of the program notify() will be called many times and it will carry different Event instances.

Community
  • 1
  • 1
J_Strauton
  • 2,270
  • 3
  • 28
  • 70
  • 2
    Could you tell us more about what you're trying to do? This vaguely sounds like either an `ExecutorService` or a `CompletionService`. – David Ehrmann Sep 19 '14 at 19:42
  • I want to dispatch `Events` and I want to do it in a Java thread. `Events` are messages. I need to pass the new `Event` to the thread each time notify is called. – J_Strauton Sep 19 '14 at 19:43
  • Does `CompletionService` solve your case? Read more here: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletionService.html – Abhiroop Sarkar Sep 19 '14 at 19:46
  • @AbhiroopSarkar no, it does not. – J_Strauton Sep 19 '14 at 19:48
  • How about putting this event on a Queue and letting the parent thread consume it? – Abhiroop Sarkar Sep 19 '14 at 19:53
  • @AbhiroopSarkar I thought about the queue, but I can't picture how **synchronization** would be done **without having the thread in an infinite loop** waiting. If you post your vision of it, even if you use an infinite loop, I’d at least give it an up vote. – J_Strauton Sep 19 '14 at 20:02

2 Answers2

4

I put together a simple example that sorta shows how ive been doing async event publications as of late. Check it out, I think its pretty self explanatory:

The main method:

public static void main(String[] args) throws InterruptedException {
    Observer obs = new Observer();
    EventBus.subscribe(obs, SomeEventImp.class);

    SomeEventImp evt = new SomeEventImp(new Object(), "This is the value");

    EventBus.publishAsync(evt);

    Thread.sleep(Long.MAX_VALUE);
} 

The Observer interface:

public interface IObserver {

    public void update(AEvent event);
}

And Observer implementation:

public class Observer implements IObserver {
    @Override
    public void update(AEvent event) {
        System.out.println("I got and event from " + event.getSource() + " with a value of " + event.getValue());
    }
}

The AEvent class:

public abstract class AEvent<T> {

    protected final T value;
    protected final Object source;

    public AEvent(Object source, T value) {
        this.value = value;
        this.source = source;
    }

    public Object getSource() {
        return source;
    }

    public T getValue() {
        return value;
    }
}

The event bus:

public class EventBus {

    // our observers
    private static HashMap<IObserver, Class<?>> m_Observers = new HashMap<IObserver, Class<?>>();
    // our incoming events
    private static BlockingQueue<AEvent<?>> incoming = new LinkedBlockingQueue<AEvent<?>>();

    // start our internal thread
    static {
        new Thread(new DelegationThread()).start();
    }

    // subscribe an observer
    public static void subscribe(IObserver obs, Class<?> evtClass) {
        synchronized (m_Observers) {
            m_Observers.put(obs, evtClass);
        }
    }

    // publish and event
    public static void publishAsync(AEvent<?> event) {
        incoming.add(event);
    }

    private static class DelegationThread implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    AEvent<?> evnt = incoming.take();
                    synchronized (m_Observers) {
                        for (Entry<IObserver, Class<?>> entry : m_Observers.entrySet()) {
                            if (entry.getValue() == evnt.getClass()) {
                                entry.getKey().update(evnt);
                            }
                        }
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

and finally the event implementation:

public class SomeEventImp extends AEvent<String> {

    public SomeEventImp(Object source, String value) {
        super(source, value);
    }
}

And heres the output:

I got and event from java.lang.Object@5e1387c6 with a value of This is the value

Clearly you would want to clean this up a bit... I did just sorta slap this together in a few minutes, and didnt really check it all that well.

Mark W
  • 2,791
  • 1
  • 21
  • 44
  • Won't your loop keep running even when your `queue` is empty? – J_Strauton Sep 19 '14 at 20:51
  • @J_Strauton, no, its a blocking queue implementation. the inner thread waits for items to become available on .take() – Mark W Sep 19 '14 at 20:53
  • I could be mistaken, but won't you have to add this? `synchronized(m_Observers) { incoming.add(event); }` – J_Strauton Sep 19 '14 at 21:00
  • 2
    @J_Strauton, No LinkedBlockingQueue is a thread safe collection. See http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html Also http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html Also, the m_Observers lock is only used for the HashMap, but I understand what you were going for. – Mark W Sep 19 '14 at 21:02
  • Awesome! One last noob question: `if(entry.getValue() == evnt.getClass())` is so that your Observer listens only to the type(s) of Event they care about? That is, if they register `EventBus.subscribe(obs, Cookies.class);` they will only get `Cookies` Event? – J_Strauton Sep 19 '14 at 21:07
  • 1
    Exactly, they would subscribe themselves and and specify the class of the event they want to be notified about. The implementation above only allows them to subscribe to a single event type... but it would be easy enough to change the hashmap to have a collection of Class> objects, and make the subscribe method accept an IObserver and say... List> so an observer can be notified on multiple event types. I can update the example if you wish. – Mark W Sep 19 '14 at 21:10
  • You actually answered what was going to be my next SO question as well. I've been looking for `EventBus.subscribe(obs, Cookies.class);` for a while :D. You are awesome, thank you. – J_Strauton Sep 19 '14 at 21:13
  • @J_Strauton I wanted to point out one more thing, because it isnt immediately obvious, but with the EventBus above, any object that you subscribe events to cannot be GC'ed. You can solve this by making the key of the hashmap WeakReference. This will allow those objects to be garbage collected, but you would have to take care to remove that entity from the hashmap if the value of the weak reference object is null, that means its been GC'ed, and you cant publish to it for obvious reasons. – Mark W Sep 20 '14 at 03:35
0

Obviously you cannot use a constructor parameter, since you don't want to call a constructor each time you start a thread on the computation you want to do. So the starting of the thread is going to have to be distinct from it getting the work it is supposed to do. It is going to have to get started, then wait until that computation is ready to do, obtain an object that tells it what to work on, and then start work on it.

Since you seem to be worried about creating new threads, I would assume you would want a thread object that did one computation, then got itself ready for another one, waited for it, and began again. I don't see anything wrong with an infinite loop for that.

Other suggestions will likely be made about details of the above implementation -- queue, dispatcher object, etc. -- But this is the answer to your actual question. That thread can have an internal object that is set by a dispatcher, and the dispatcher manages a queue. Whatever. They're all going to be variations of an object obtaining some data after its creation, which of course objects do all the time.

arcy
  • 12,845
  • 12
  • 58
  • 103