15

I want to use gRPC to let clients subscribe to events generated by the server. I have an RPC declared like so:

rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);

where the returned stream is infinite. To "unsubscribe", clients cancel the RPC (btw. is there a cleaner way?).

I have figured out how the client can cancel the call:

Context.CancellableContext cancellableContext =
         Context.current().withCancellation();
cancellableContext.run(() -> {
   stub.subscribe(request, callback);
});
// do other stuff / wait for reason to unsubscribe
cancellableContext.cancel(new InterruptedException());

However, the server does not seem to notice that a client has cancelled its call. I'm testing this with a dummy server implementation:

@Override
public void subscribe(SubscribeRequest request,
                      StreamObserver<SubscribeResponse> responseObserver) {
  // in real code, this will happen in a separate thread.
  while (!Thread.interrupted()) {
    responseObserver.onNext(SubscribeResponse.getDefaultInstance());
  }
}

The server will happily continue sending its messages into the ether. How can the server recognize that the call was cancelled by the client and thus stop sending responses?

mkobit
  • 43,979
  • 12
  • 156
  • 150
Balz Guenat
  • 1,552
  • 2
  • 15
  • 35

1 Answers1

13

I found the answer myself. You cast the StreamObserver passed to subscribe to a ServerCallStreamObserver, which exposes methods isCancelled and setOnCancelHandler.

scso = ((ServerCallStreamObserver<SubscribeResponse>) responseObserver);

scso.setOnCancelHandler(handler);
// or
if (scso.isCancelled()) {
  // do whatever
}

This, to me, begs the question why subscribe isn't passed a ServerCallStreamObserver to begin with.

mkobit
  • 43,979
  • 12
  • 156
  • 150
Balz Guenat
  • 1,552
  • 2
  • 15
  • 35
  • 1
    We can't change the type because that would break pre-existing users :-( . We'd have to leave the old methods around, which brings its own confusions. Note that `Context.addListener()` is another option. With ServerCallStreamObserver the callback with be synchronized with other callbacks (like onNext/onCompleted). With Context it will be called separately and your implementation would need to worry about thread-safety. – Eric Anderson Feb 08 '19 at 16:00
  • 3
    @eric-anderson, I’m sorry whenever I see a “previous users” argument, I must stop whatever I’m doing and call bull on it. Yeah, may be 10 people using gRPC outside Google in 2016 would notice if a type was renamed, but what about 10000 people using gRPC in 2020? Why would they continue to be confused? Then there’s versioning; an API doesn’t have to be backward compatible until the apocalypse arrives. – Abhijit Sarkar Aug 04 '20 at 08:31
  • 1
    @AbhijitSarkar several organizations with complex micro-service web, facing significant traffic (like for example Netflix) have been using gRPC for some time now. I guess they would totally crucify Eric, if he suddenly changed the API like that ;-] Nevertheless, I think that having 2 versions of methods is a much better idea, than forcing ppl to perform some bizarre casting at the beginning of each method ;-) – morgwai Jun 04 '21 at 23:21
  • @morgwai I did say “_Then there’s versioning_”; that’s the only argument a software maintainer needs to make before changing the API. – Abhijit Sarkar Jun 04 '21 at 23:33
  • @AbhijitSarkar increasing API's version number does not change the fact that an organization like Netflix would have to adapt hundreds of micro-services to use the new API in order to pick-up bug-fixes and improvements of the new release. To avoid forcing users of your lib to do this adapting, you would need to support both versions of the API in the new release anyway (similarly to what I advocated for, btw). So really versioning does not solve anything in case of a rolling-release model. – morgwai Jun 05 '21 at 08:09
  • 1
    @morgwai The previous major release would be put in maintenance mode with only bug fixes, first for one or two minor releases, and then not maintained anymore. This is a non issue and isn’t a software problem at all; it’s a question of how much the library maintainer wants to bend over backwards to please large companies. – Abhijit Sarkar Jun 05 '21 at 18:45