I restarted my Windows VM but this did not help. On the next day I restarted my main.go
and right after that I saw that old stuck messages began to come.
My Subscription Type
is Pull, my Acknowledgement Deadline
is maximum: 600 seconds.
Background: I want to use Pubsub as a load balancer in my managed group of Windows instances (I need Windows API for that task). Message processing is CPU intensive (with a few HTTP calls) and can take from a few seconds to a few minutes.
Some other metrics from Stackdriver:
I have no idea what I can check. Day ago I did a highload testing and looks like everything was fine (Undelivered Messages
was zero as we see on the first screenshot above). Now my CPU consumption is zero, managed group decreased to one instance (this is not in a production environment). I try to use Pubsub for the first time. Code of my main()
that synthesize audio from text chunks, encode to two formats and upload to S3:
func main() {
fmt.Printf("Current time: %v\n",
time.Now().Format(timeFormat),
)
// https://godoc.org/cloud.google.com/go/pubsub#Subscription.Receive
err := subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
timeStart := time.Now()
if timeStartFirstMessage == nil {
timeStartFirstMessage = &timeStart
}
var data pubsubMessage
json.Unmarshal(m.Data, &data)
fmt.Printf("Got message: %#v\n", data)
var wg sync.WaitGroup
wg.Add(len(data.TextChunks))
wavs := make([][]byte, len(data.TextChunks))
for i, chunk := range data.TextChunks {
/** TODO without 'go' (sequential) will process in correct order:
* user can listen first and seconds chunks faster,
* but m4a will be later,
* but on "high load" of the VM with ~2x more messages in parallel
* (each message on it is own goroutine)
* performance may be even better (I think) because
* hundreds of CPU intensive goroutine is worse.
*
* Also in sequential code I can append() to wav[]
* instead of doind it later in another func,
* maybe this will also improve perfomance (profile).
**/
go func(i int, chunk string) {
fmt.Printf("Index of text chunk: %d\n", i)
wav := tts(data.Voice, chunk)
streamOfOggEncoder := encodeOggVorbis(wav)
uploadToS3(
"intelligentspeaker",
data.Sub+"/"+data.HashArticle+"/"+fmt.Sprint(i),
"audio/ogg",
streamOfOggEncoder,
)
wavs[i] = wav
wg.Done()
}(i, chunk)
}
wg.Wait()
wavConcated := concat(wavs)
filename := buildPodcastEpisodeFilename(data.Title)
err := encodePodcastEpisode(wavConcated, filename)
if err != nil {
m.Nack()
return
}
if err != nil {
logger.Log(logging.Entry{Payload: fmt.Sprintf("ERROR on m4a deleting: %v", err)})
}
key := data.Sub + "/" + data.HashArticle + "/" + random() + "/" + filename
readCloser, size := getReadCloserAndSize(filename)
if readCloser == nil {
m.Nack()
return
}
uploadToS3("intelligentspeaker--podcasts", key, "audio/x-m4a", readCloser)
// Next message may be with the same title (filename)
err = os.Remove(filename)
fmt.Printf("Duration: %v\n", duration(wavConcated))
updatePodcastXML(
key,
data.Title,
data.URLArticle,
data.FavIconURL,
duration(wavConcated),
data.Utc,
size,
)
fmt.Printf("DONE pubsub message, current time: %v\n", time.Now().Format(timeFormat))
fmt.Printf("Time of message processing: %v\n", time.Since(timeStart).Round(time.Second))
fmt.Printf(
"Time of all messages processing (counted from the first message, not from the start of this app): %v\n",
time.Since(*timeStartFirstMessage).Round(time.Second),
)
m.Ack()
})
if err != nil {
logger.Log(logging.Entry{Payload: fmt.Sprintf("ERROR on registering receiver: %v", err)})
}
}
Update: found similar question.