1

I am trying to get all the available messages from a topic in google pub-sub. But in go I am not able to find a configuration which can cancel the receive callback once there are no more messages remaining in the Pub-Sub.

One approach I think is to get the total number of messages from Pub-Sub using the Google Cloud Monitoring Api's discribed in this answer Google PubSub - Counting messages in topic and then keeping a count of the number of messages read and calling cancel if the count is equal to the number, but I am not so sure if this is the right approach to move ahead.

    var mu sync.Mutex
    received := 0
    sub := client.Subscription(subID)
    cctx, cancel := context.WithCancel(ctx)
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
            mu.Lock()
            defer mu.Unlock()
            fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
            msg.Ack()
            received++
            if received == TotalNumberOfMessages {
                    cancel()
            }
    })
    if err != nil {
            return fmt.Errorf("Receive: %v", err)
    }

I have tried using the context with timeout as well, i.e. fetch until this context deadline is not met, after that cancel.

ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
}

But then again that won't give me certainty that all the messages have been processed.

Please suggest a solution that can make sure that subscription.Receive function stops when there are no more messages remaining in the Pub-Sub.

1 Answers1

2

I already implemented that in my previous company (sadly I no longer have the code, it is in my previous company git...). However it worked.

The principle was the following

msg := make(chan *pubsub.Message, 1)
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
    msg <- m
    })
for {
  select {
    case res := <-msg:
      fmt.Fprintf(w, "Got message: %q\n", string(res.Data))
      res.Ack()
  
    case <-time.After(3 * time.Second):
        fmt.Println("timeout")
        cancel()
    }
}
guillaume blaquiere
  • 66,369
  • 2
  • 47
  • 76