-3

As per this answer,

A Message Bus is a messaging infrastructure to allow different systems to communicate through a shared set of interfaces(message bus).

https://i.stack.imgur.com/5PkJy.gif


Below is the createHub() function & Run() method launched by main() to create message hub to communicate a publisher with multiple subscribers:

type PubHub struct {
    subscribers map[*subscriptionmediator.HandlerSubscription]struct{}
    Register    chan *subscriptionmediator.HandlerSubscription
    Unregister  chan *subscriptionmediator.HandlerSubscription
    Broadcast   chan *events.Env
}


func createHub() *PubHub {

    return &PubHub{
        subscribers: map[*subscriptionmediator.HandlerSubscription]struct{}{},
        Register:    make(chan *subscriptionmediator.HandlerSubscription),
        Unregister:  make(chan *subscriptionmediator.HandlerSubscription),
        Broadcast:   make(chan *events.Envelope),
    }
}

func (h *PubHub) Run() {

    for {
        select {
        case subscriber := <-h.Register:
            h.subscribers[subscriber] = struct{}{}

        case subscriber := <-h.Unregister:
            if _, ok := h.subscribers[subscriber]; ok {
                delete(h.subscribers, subscriber)
            }
        case message := <-h.Broadcast:
            for subscriber := range h.subscribers {
                subscriber.DataChannel <- message
            }
        }
    }
}

where each subscriber registers, as shown below:

    subscription := &subscriptionmediator.HandlerSubscription{
        conn,
        make(chan *events.Envelope),
    }
    hub.Register <- subscription

DataChannel is used for communication between publisher & multiple subscribers

type HandlerSubscription struct {
    ConnInstance *websocket.Conn
    DataChannel  chan *events.Envelope
}

1) Can the above code be considered following message bus based pub-sub pattern?

2) How to avoid one subscriber blocking rest all subscribers, from signalling on a channel? subscriber.DataChannel <- message

overexchange
  • 15,768
  • 30
  • 152
  • 347
  • 3
    Regardless how to call it: `subscriber.DataChannel <- message` this is blocking, which means one slow or (worse) stuck subscriber would block the world. – zerkms Jun 03 '20 at 04:32
  • @CeriseLimón So, does `select` syntax verify, if client is blocked, by sending message (`client.send <- message`)? If no success, then run the `default` case and close the channel... – overexchange Jun 03 '20 at 14:30
  • @CeriseLimón Can `PubHub` be considered as an implementation of message hub mentioned [here](https://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html)? – overexchange Jun 03 '20 at 14:41
  • @CeriseLimón Related question: https://stackoverflow.com/questions/69641809/how-to-handle-connection-drop-from-source – overexchange Oct 20 '21 at 07:34

1 Answers1

5

Can the above code be considered following message bus based pub-sub pattern?

Yes.

Volker
  • 40,468
  • 7
  • 81
  • 87
  • But message bus should not be blocking one subscriber...due to another subscriber running slow, as mentioned in the comment – overexchange Jun 03 '20 at 12:10
  • 1
    @overexchange, perhaps you should have included the definition of message bus you want to use in the question. And if you do that the answer may become obvious. – Peter Jun 03 '20 at 12:17
  • 2
    @overexchange "following [... a] pattern" is not the same as "is this a clever, suitable and production implementation of...". – Volker Jun 03 '20 at 12:38
  • @Peter Do you mean the `DataChannel`? I have added in the query... – overexchange Jun 03 '20 at 12:53