18

I'm using Google's Pub/Sub queue to handle messages between services. Some of the subscribers connect to rate-limit APIs.

For example, I'm pushing street addresses onto a pub/sub topic. I have a Cloud function which subscribes (via push) to that topic, and calls out to an external rate-limited geocoding service. Ideally, my street addresses could be pushed onto the topic with no delay, and the topic would retain those messages - calling the subscriber in a rate-limited fashion.

Is there anyway to configure such a delay, or a message distribution rate limit? Increasing the Ack window doesn't really help: I've architected this system to prevent long-running functions.

emma ray
  • 13,336
  • 1
  • 24
  • 50
  • 2
    I've run into rate limiting issues with PubSub and Cloud Functions. The solution has been create a record of jobs to be done in a Datastore. Create a Cloud Function on a scheduler that pulls n objects from Datastore, then execute a cloud function. You're effectively creating a queuing system. It's a pain to update the DS objects with each step in the process, but it also makes it easy to visualize the pipeline. – ethanenglish Jul 27 '18 at 13:30
  • One year, any update? – Lin Du Dec 07 '18 at 08:58
  • @slideshowp2 google does not seem interested in supporting this use case, no udpate – emma ray Feb 08 '19 at 14:19
  • 2
    You could try to pull from Pub/Sub periodically, using Cloud Scheduler you can set a cron to trigger a Cloud Function to pull a limited number of messages from Pub/Sub topic and call your rate-limited API. – Jonathan Lin Nov 08 '19 at 08:24
  • three years later, the above seems to be the right approach (manually pulling from the queue on regular intervals via a cron job) – emma ray May 04 '21 at 18:32
  • Just chiming in to say we have this use case too. Pulling from a Pub/Sub subscription in a Dataflow streaming job, and when there's a backlog, the API we send the data to at the end of the pipeline returns 429s. It'd be nice to just throttle the rate at which we process the data. – Matt Welke Aug 27 '21 at 16:58
  • 1
    @chrisstamper take a look at [the answer from Akash](https://stackoverflow.com/a/73302173/680920). The queue works nicely, however it looks like you can't load it in bulk. I'm researching that side of it more, maybe you can do bulk loading w/ App Engine, but aside from that limitation this seems the best answer. – quickshiftin Mar 01 '23 at 19:03

4 Answers4

4

Because there's no answer so far describing workarounds, I'm going to answer this now by stating that there is currently no way to do this. There are workarounds (see the comments on the question that explain how to create a queueing system using Cloud Scheduler), but there's no way to just set a setting on a pull subscription that creates a rate limit between it and its topic.

I opened a feature request for this though. Please speak up on the tracked issue if you'd like this feature.

https://issuetracker.google.com/issues/197906331

Matt Welke
  • 1,441
  • 1
  • 15
  • 40
  • 4yrs later I don't think it's going to happen. It seems that pub/sub is a queue and nothing more -- consumption of that queue is out off scope for this abstraction. In fact AWS SQS is similar in requiring throttling to be done on the consumer side. – emma ray Aug 27 '21 at 19:05
  • 1
    Well, I do see GCP adding a lot of features lately. We got dead letter topics, Protobuf schemas enforced at the topic level. This is good stuff. Maybe we'll get throttling on the consumer side managed by GCP. – Matt Welke Aug 27 '21 at 23:19
  • 1
    Update: The feature request has moved into the "Accepted" state. – Matt Welke Sep 29 '21 at 16:54
2

An aproach to solve your problem is by using: async.queue

There you have a concurrency attribute wich you can manage the rate limit.

// create a queue object with concurrency 2
var q = async.queue(function(task, callback) {
    console.log('hello ' + task.name);
    callback();
}, 2);

// assign a callback
q.drain = function() {
    console.log('all items have been processed');
};

// add some items to the queue
q.push({name: 'foo'}, function(err) {
    console.log('finished processing foo');
});

// quoted from async documentation
Alexandru Olaru
  • 6,842
  • 6
  • 27
  • 53
  • 3
    My goal is to store tasks in a pub/sub queue (think SNS/SQS), so that I do not have to have a constantly-running server, and can use short lived lambdas instead. This solution would require an long running server. – emma ray Jul 26 '17 at 15:46
  • Besides, What about deploying the application on `GKE` cluster? Which means the application has multiple instances. – Lin Du Feb 09 '19 at 05:36
2

GCP cloud task queue enables you to limit the number of tasks. Check this doc

Akash Babu
  • 950
  • 6
  • 10
  • While this does have the rate limiting, it looks like it's part of App Engine, which is separate from many of the other offerings. – quickshiftin Feb 04 '23 at 21:38
  • Nope, it isn't specific to App Engine, you can configure other types of targets as well. It basically makes an HTTP API request to the specified endpoint. Check [this doc](https://cloud.google.com/tasks/docs/dual-overview#http) – Akash Babu Feb 06 '23 at 12:35
  • Thx for the clarification and the link; I'll give it a read! – quickshiftin Feb 06 '23 at 14:27
  • Any chance you know why [I can't authenticate](https://stackoverflow.com/questions/75378704/google-cloud-tasks-node-js-client-authentication-failure)? – quickshiftin Feb 07 '23 at 23:16
  • Thanks for your help @Akash, I hooked you up with a few points for your trouble :D – quickshiftin Mar 01 '23 at 18:50
1

If you need to use an HTTP function, then Cloud Tasks is the right solution. See Choosing Pub/Sub or Cloud Tasks.

If you don't need HTTP, you can use a Background function (1st gen). When using a Background function, the docs say:

Note: Requests sent to overloaded HTTP functions will fail with a response code of 429 Too Many Requests. Events destined for event-driven functions will automatically be saved until capacity is available.

More suggestions https://stackoverflow.com/a/76269946/10720618

kym
  • 818
  • 7
  • 12