9

I am using PostgreSQL DB and applying it's LISTEN/NOTIFY functionality. So my listener is at my AS (Application Server) and I have triggers configured on my DB such that when CRUD operations are performed on a table a NOTIFY request is sent on AS.

LISTENER class in java:

        @Singleton
        @Startup
    NotificationListenerInterface.class)
        public class NotificationListener extends Thread implements NotificationListenerInterface {

            @Resource(mappedName="java:/RESOURCES") 
            private DataSource ds;

            @PersistenceContext(unitName = "one")
            EntityManager em;

            Logger logger = Logger.getLogger(NotificationListener.class);

            private Connection Conn;
            private PGConnection pgConnection = null;
            private NotifyRequest notifyRequest = null;

            @PostConstruct
            public void notificationListener() throws Throwable {

                System.out.println("Notification****************");
                try
                {


                    Class.forName("com.impossibl.postgres.jdbc.PGDriver");
                    String url = "jdbc:pgsql://192.xx.xx.126:5432/postgres";


                    Conn = DriverManager.getConnection(url,"postgres","password");
                    this.pgConnection = (PGConnection) Conn;

                    System.out.println("PG CONNECTON: "+ pgConnection);
                    Statement listenStatement = Conn.createStatement();
                    listenStatement.execute("LISTEN notify_channel");
                    listenStatement.close();

                    pgConnection.addNotificationListener(new PGNotificationListener() {

                        @Override
                        public void notification(int processId, String channelName, String payload){

                            System.out.println("*********INSIDE NOTIFICATION*************");

                            System.out.println("Payload: " + jsonPayload);

}

So as my AS is up, I have configured that at startup the listener class is called (@Startup annotation) and it's start listening on the channel.

Now this works fine if like say for testing I edit my table in DB manually, the notification is generated and the LISTENER receives it.

However, when I programmatically send a UPDATE request on the table, the UPADTE is performed successfully but LISTENER is not receiving anything.

I feel my connection of the LISTENER goes down when I send a request (it also makes a connection to edit entities), but I am not sure. I read about permanent connections and pooled connections, but not able to decide how to pursue that.

I am using pgjdbc (http://impossibl.github.io/pgjdbc-ng/) jar for async notifications as jdbc connection requires polling.

EDIT:

When I try the above listener with polling by using the standard jdbc jar (not pgjdbc), I get the notifications.

I do PGNotification notif[] = con.getNotifications() and I get notifications, however doing it asynchronously like below I don't get notifications.

    pgConnection.addNotificationListener(new PGNotificationListener() {

         @Override
         public void notification(int processId, String channelName, String payload){

            System.out.println("*********INSIDE NOTIFICATION*************");
         }

SOLVED:

My listener was going out of scope after the function execution was completed as my listener had the function scope. So kept it into a member variable of my startup bean class and then it worked.

Siddharth Trikha
  • 2,648
  • 8
  • 57
  • 101
  • Inside your listener, the variable 'jsonPayload' does not exist. Also, are you using the same connection to write your updates? It's feasible that your connection with the listener attached goes out of scope and is destroyed by the GC. – Luke A. Leber Jun 22 '16 at 12:23
  • I am not using the same connection. But I checked using `netstat` that the connections were in established state i.e the old connection was not lost. `netstat --numeric-ports|grep 5432|grep my.ip` gave two connections (one old and one new) and both in ESTABLISHED state: `tcp 0 0 192.168.5.126:5432 192.168.105.213:46802 ESTABLISHED tcp 0 0 192.168.5.126:5432 192.168.105.213:46805 ESTABLISHED` – Siddharth Trikha Jun 23 '16 at 05:36
  • @LukeA.Leber: Please check the edit to the question. – Siddharth Trikha Jun 23 '16 at 08:29
  • @LukeA.Leber: Since my connection is not closed, I feel my listener which I register `pgConnection.addNotificationListener(new PGNotificationListener() {})` goes out of session. Any comments? – Siddharth Trikha Jun 23 '16 at 08:52
  • 1
    The notification listeners are internally maintained by that library as weak references meaning that you have to hold a hard reference externally so they won't be garbage collected. Check out the BasicContext class: synchronized (notificationListeners) { notificationListeners.put(key, new WeakReference(listener)); } If the GC picks up your listener, calls to "get" on the weak reference will return null and will not fire. – Luke A. Leber Jun 23 '16 at 11:22

1 Answers1

8

The notification listeners are internally maintained by that library as weak references meaning that you have to hold a hard reference externally so they won't be garbage collected. Check out the BasicContext class lines 642 - 655:

public void addNotificationListener(String name, String channelNameFilter, NotificationListener listener) {

    name = nullToEmpty(name);
    channelNameFilter = channelNameFilter != null ? channelNameFilter : ".*";

    Pattern channelNameFilterPattern = Pattern.compile(channelNameFilter);

    NotificationKey key = new NotificationKey(name, channelNameFilterPattern);

    synchronized (notificationListeners) {
      notificationListeners.put(key, new WeakReference<NotificationListener>(listener));
    }

}

If the GC picks up your listener, calls to "get" on the weak reference will return null and will not fire as seen from lines 690 - 710

  @Override
  public synchronized void reportNotification(int processId, String channelName, String payload) {

    Iterator<Map.Entry<NotificationKey, WeakReference<NotificationListener>>> iter = notificationListeners.entrySet().iterator();
    while (iter.hasNext()) {

      Map.Entry<NotificationKey, WeakReference<NotificationListener>> entry = iter.next();

      NotificationListener listener = entry.getValue().get();
      if (listener == null) {

        iter.remove();
      }
      else if (entry.getKey().channelNameFilter.matcher(channelName).matches()) {

        listener.notification(processId, channelName, payload);
      }

    }

}

To fix this, add your notification listeners as such:

/// Do not let this reference go out of scope!
    PGNotificationListener listener = new PGNotificationListener() {

    @Override
    public void notification(int processId, String channelName, String payload) {
        // interesting code
    };
};
    pgConnection.addNotificationListener(listener);

Quite an odd use-case for weak references in my opinion...

Ajay Takur
  • 6,079
  • 5
  • 39
  • 55
Luke A. Leber
  • 702
  • 6
  • 17