1

stackdriver undelivered messages

I restarted my Windows VM but this did not help. On the next day I restarted my main.go and right after that I saw that old stuck messages began to come.

My Subscription Type is Pull, my Acknowledgement Deadline is maximum: 600 seconds.

Background: I want to use Pubsub as a load balancer in my managed group of Windows instances (I need Windows API for that task). Message processing is CPU intensive (with a few HTTP calls) and can take from a few seconds to a few minutes.

Some other metrics from Stackdriver: stackdriver pubsub stackdriver oldest unacknowledged message stackdriver backlog size

I have no idea what I can check. Day ago I did a highload testing and looks like everything was fine (Undelivered Messages was zero as we see on the first screenshot above). Now my CPU consumption is zero, managed group decreased to one instance (this is not in a production environment). I try to use Pubsub for the first time. Code of my main() that synthesize audio from text chunks, encode to two formats and upload to S3:

func main() {
    fmt.Printf("Current time: %v\n",
        time.Now().Format(timeFormat),
    )

    // https://godoc.org/cloud.google.com/go/pubsub#Subscription.Receive
    err := subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
        timeStart := time.Now()
        if timeStartFirstMessage == nil {
            timeStartFirstMessage = &timeStart
        }

        var data pubsubMessage
        json.Unmarshal(m.Data, &data)
        fmt.Printf("Got message: %#v\n", data)

        var wg sync.WaitGroup
        wg.Add(len(data.TextChunks))

        wavs := make([][]byte, len(data.TextChunks))
        for i, chunk := range data.TextChunks {
            /** TODO without 'go' (sequential) will process in correct order:
             * user can listen first and seconds chunks faster,
             * but m4a will be later,
             * but on "high load" of the VM with ~2x more messages in parallel
             * (each message on it is own goroutine)
             * performance may be even better (I think) because
             * hundreds of CPU intensive goroutine is worse.
             *
             * Also in sequential code I can append() to wav[]
             * instead of doind it later in another func,
             * maybe this will also improve perfomance (profile).
            **/
            go func(i int, chunk string) {
                fmt.Printf("Index of text chunk: %d\n", i)
                wav := tts(data.Voice, chunk)
                streamOfOggEncoder := encodeOggVorbis(wav)
                uploadToS3(
                    "intelligentspeaker",
                    data.Sub+"/"+data.HashArticle+"/"+fmt.Sprint(i),
                    "audio/ogg",
                    streamOfOggEncoder,
                )
                wavs[i] = wav

                wg.Done()
            }(i, chunk)
        }
        wg.Wait()
        wavConcated := concat(wavs)

        filename := buildPodcastEpisodeFilename(data.Title)
        err := encodePodcastEpisode(wavConcated, filename)
        if err != nil {
            m.Nack()
            return
        }

        if err != nil {
            logger.Log(logging.Entry{Payload: fmt.Sprintf("ERROR on m4a deleting: %v", err)})
        }

        key := data.Sub + "/" + data.HashArticle + "/" + random() + "/" + filename
        readCloser, size := getReadCloserAndSize(filename)
        if readCloser == nil {
            m.Nack()
            return
        }
        uploadToS3("intelligentspeaker--podcasts", key, "audio/x-m4a", readCloser)
        // Next message may be with the same title (filename)
        err = os.Remove(filename)

        fmt.Printf("Duration: %v\n", duration(wavConcated))
        updatePodcastXML(
            key,
            data.Title,
            data.URLArticle,
            data.FavIconURL,
            duration(wavConcated),
            data.Utc,
            size,
        )

        fmt.Printf("DONE pubsub message, current time: %v\n", time.Now().Format(timeFormat))
        fmt.Printf("Time of message processing: %v\n", time.Since(timeStart).Round(time.Second))
        fmt.Printf(
            "Time of all messages processing (counted from the first message, not from the start of this app): %v\n",
            time.Since(*timeStartFirstMessage).Round(time.Second),
        )

        m.Ack()

    })
    if err != nil {
        logger.Log(logging.Entry{Payload: fmt.Sprintf("ERROR on registering receiver: %v", err)})
    }

}

Update: found similar question.

Vitaly Zdanevich
  • 13,032
  • 8
  • 47
  • 81

1 Answers1

0

I'm assuming that the metric that is concerning is pubsub.googleapis.com/subscription/num_undelivered_messages which measures the "Number of unacknowledged messages (a.k.a. backlog messages) in a subscription." From the "Acknowledge Requests" graph, we see that your subscribers stopped acknowledging message at around 9pm on the 28th. That's the same time when the "Oldest Unacknowledged Message" age started growing (linearly). Meanwhile, the "Backlog Size" graph is pretty flat. This means that there is a small number of "problematic" messages in the subscription's backlog that the subscribers are not acknowledging. Most likely, the subscribers are having trouble processing this small set of messages for some reason. Perhaps these messages are malformed in some way, or do not conform some expectation of the subscriber code.

One thing you can do to try to debug this is to use the gcloud command-line tool to "peek" into the messages in the subscription's backlog: https://cloud.google.com/sdk/gcloud/reference/pubsub/subscriptions/pull. Note that gcloud tool will not acknowledge any messages if you do not set the --auto-ack flag.

  • Thank you, now I will try to "peek" into the messages with `gcloud`. Also note that at this stage, with stucked messages, when I send any new messages (I am sure that they are correct) - they will not Received, but only the graph `num_undelivered_messages` will increase. – Vitaly Zdanevich May 30 '19 at 15:46
  • `│ {"Voice":"David","TextChunks":["my text here"],"Sub":"f4cfd4a8-0e94-4287-8c5e-1b01538dd2a1","HashArticle":"b1bc248a7ff2b2e95569f56de68615dfDavid","Title":"Nazism - Wikipedia","URLArticle":"https://en.wikipedia.org/wiki/Nazism","FavIconURL":"https://en.wikipedia.org/static/favicon/wikipedia.ico","Utc":1559210260} │ 432924914515591 │ │ WBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRILUxNRXHYTTBBuM1x1B1ENGHQvYX1oCRQJVBRZflVbCTxofmNxBFMEG3Z0YHFvWxYEC0R7xtnzh_WWFkZJP8XElqVIQbbFm74zZhg9XBJLLD5-LS9FQV5AEkwmGkRJUytDCyo |` – Vitaly Zdanevich May 30 '19 at 15:59
  • This is one of the outputs from `gcloud pubsub subscriptions pull "my-subscription-id"`, looks ok for me. – Vitaly Zdanevich May 30 '19 at 16:00
  • I suggest trying to run your subscriber logic against the body of this message, to see if it handles the message correctly (i.e., doesn't crash or fail to acknowledge the message). You can also just acknowledge this message via `gcloud` (though note that that means that your actual subscriber would not process this message). – Alex Mordkovich May 31 '19 at 15:26