2

I am a newbie of CapnProto.
I want to request a function of server2 in the callback of server1.
But I got exception like below.
Please help me resolve it.
Many thanks!

* thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x3000000000020)
  * frame #0: 0x000000010035859c libcapnp-rpc-0.10.2.dylib`capnp::VatNetwork<capnp::rpc::twoparty::VatId, capnp::rpc::twoparty::ProvisionId, capnp::rpc::twoparty::RecipientId, capnp::rpc::twoparty::ThirdPartyCapId, capnp::rpc::twoparty::JoinResult>::baseConnect(capnp::AnyStruct::Reader) + 20
    frame #1: 0x0000000100361a2c libcapnp-rpc-0.10.2.dylib`kj::_::TransformPromiseNode<kj::_::Void, kj::Own<kj::AsyncIoStream>, capnp::EzRpcClient::Impl::Impl(kj::StringPtr, unsigned int, capnp::ReaderOptions)::'lambda'(kj::Own<kj::AsyncIoStream>&&), kj::_::PropagateException>::getImpl(kj::_::ExceptionOrValue&) + 512
    frame #2: 0x00000001004d2220 libkj-async-0.10.2.dylib`kj::_::RunnableImpl<kj::_::TransformPromiseNodeBase::get(kj::_::ExceptionOrValue&)::$_31>::run() + 32
    frame #3: 0x000000010028aaa4 libkj-0.10.2.dylib`kj::_::runCatchingExceptions(kj::_::Runnable&) + 40
    frame #4: 0x00000001004c7e48 libkj-async-0.10.2.dylib`kj::_::TransformPromiseNodeBase::get(kj::_::ExceptionOrValue&) + 64
    frame #5: 0x00000001004c8684 libkj-async-0.10.2.dylib`kj::_::ForkHubBase::fire() + 60
    frame #6: 0x00000001004c654c libkj-async-0.10.2.dylib`kj::_::waitImpl(kj::Own<kj::_::PromiseNode>&&, kj::_::ExceptionOrValue&, kj::WaitScope&, kj::SourceLocation) + 608
    frame #7: 0x0000000100005198 client`kj::Promise<capnp::Response<SampleServer1::CallbackRegisterResults> >::wait(kj::WaitScope&, kj::SourceLocation) + 120
    frame #8: 0x0000000100004b94 client`main + 344
    frame #9: 0x000000010003d08c dyld`start + 520

Source code implementation sample as below

SampleServer1.capnp:

interface SampleServer1 {

    callbackRegister @0 (callback :Callback) -> ();   //to register a callback

    interface Callback {
        calbackFunc @0 (in :Int32) -> ();
    }
}

SampleServer1::Server Impl:

class SampleServer1Impl : public SampleServer1::Server
{
::kj::Promise<void> callbackRegister(CallbackRegisterContext context){
    auto cb = context.getParams().getCallback());

    auto request = cb.calbackFuncRequest();   //Call callback function
    request.setIn(111);
    auto promise = request.send();

    return kj::READY_NOW;
  }
}

SampleServer1::Callback::Server Impl:

class CallbackImpl : public SampleServer1::Callback::Server
{
::kj::Promise<void> calbackFunc(CalbackFuncContext context){

    capnp::EzRpcClient ezClient2("unix:/tmp/capnp-server-2");
    SampleServer2::Client client2 = ezClient2.getMain<SampleServer2>();

    auto& waitScope = ezClient2.getWaitScope();
    {
      auto request = client2.functionSampleRequest();  //Request to SERVER2
      request.setIn(222);
      auto promise = request.send();
     
      promise.wait(waitScope);
      
    }

    return kj::READY_NOW;
  }
}

SampleServer2.capnp:

interface SampleServer2 {

    functionSample @0 (in :Int32) -> ();
}

SampleServer2::Server impl

class SampleServer2 Impl : public SampleServer2::Server
{
::kj::Promise<void> functionSample(FunctionSampleContext context){
    //Do something
    return kj::READY_NOW;
  }
}

Client implement

  capnp::EzRpcClient ezClient("unix:/tmp/capnp-server-1");
  SampleServer1::Client client = ezClient.getMain<SampleServer1>();
  auto& waitScope = ezClient.getWaitScope();

 ::SampleServer1::Callback::Client callback = ::SampleServer1::Callback::Client(kj::heap<CallbackImpl>());

  auto request = client.callbackRegisterRequest();      //Register a callback to Server1
  request.setCallback(callback);
  auto promise = request.send();
Toby
  • 21
  • 3

2 Answers2

1

There is a problem with the implementation of calbackFunc. The calbackFunc itself is executed from NEVER_DONE.wait (event loop), so waiting for promise in this function will make a nested wait. This is not allowed in capnp.

You can avoid this by doing it on another thread. Ex:

  ::kj::Promise<void> calbackFunc(CalbackFuncContext context){
    
    kj::Thread th([](){
        capnp::EzRpcClient ezClient2("unix:/tmp/capnp-server-2");
        Server2::Client client2 = ezClient2.getMain<SampleServer2>();
        auto& waitScope = ezClient2.getWaitScope();
        {
          auto request = client2.functionSampleRequest();  //Request to SERVER2
          request.setIn(222);
          auto promise = request.send();
         
          promise.wait(waitScope);
          
        }
    });
    return kj::READY_NOW;
  }
  • If you create a thread, the crashing will be happened with an exception. Please refer: https://stackoverflow.com/questions/53850808/no-event-loop-is-runing-on-this-thread – Toby Sep 12 '22 at 09:40
  • The use case in the link you mentioned is different from this one. The client is created in one thread and sends a client request in another thread. Capnp uses a thread variable to attach the event loop to the thread when the client is initialized, so only in this thread is the event loop available. So in this situation, an exception will occur. In the solution I mentioned, the client is creating and sending the request in the same thread, and there is no problem. – Duy Nguyễn Sep 12 '22 at 11:53
  • 1
    @DuyNguyễn You have correctly identified the problem here, but using a Thread as the solution isn't the best approach. Instead, the right thing to do is replace `promise.wait()` with `promise.then()`, passing it a lambda to invoke when the promise completes. `.then()` will return another promise, which is what `callbackFunc()` should return overall (instead of `READY_NOW`). – Kenton Varda Sep 12 '22 at 13:46
  • @KentonVarda: Thank you for valuable answer. I will try it. – Toby Sep 12 '22 at 14:42
  • 1
    @KentonVarda I agree, the use of thread is just an example to show the problem. However, in this situation, replacing `wait` with `then` may not work because `EzRpcClient` is outscoped afterwards. This can be avoided by using `kj::heap` allocating it and moving it to `then` lambda. – Duy Nguyễn Sep 13 '22 at 01:55
  • Good point! You could also use `.attach()` to attach it to the final promise (after `.then()`). – Kenton Varda Sep 13 '22 at 14:23
1

Similar to Kenton's suggestion in the other answer's comments, the following works for me:

  ::kj::Promise<void> calbackFunc(CalbackFuncContext context){
    auto ezClient2 = kj::heap<capnp::EzRpcClient>("unix:/tmp/capnp-server-2");
    SampleServer2::Client client2 = ezClient2->getMain<SampleServer2>();

    auto request = client2.functionSampleRequest();  //Request to SERVER2
    request.setIn(222);
    return request.send().attach(kj::mv(ezClient2)).ignoreResult();

Notes:

  • I return the promise generated by request.send() instead of creating a new promise like you do with kj::READY_NOW.
  • I .attach the ezClient2, so that it stays in the scope of the request.send() promise. Try for yourself: remove the .attach() and see that it crashes. Read about attachments here in the KJ tour.
  • I .ignoreResult(), just to convert the request.send() promise to a kj::Promise<void>. It will still complete normally, but it just conveniently converts the type to something we can return from calbackFunc.

Now, say you want to do something after the request to functionSample() completes. You can just register a callback to call upon completion using .then (see documentation here):

  ::kj::Promise<void> calbackFunc(CalbackFuncContext context){
    auto ezClient2 = kj::heap<capnp::EzRpcClient>("unix:/tmp/capnp-server-2");
    SampleServer2::Client client2 = ezClient2->getMain<SampleServer2>();

    auto request = client2.functionSampleRequest();  //Request to SERVER2
    request.setIn(222);

    return request.send().ignoreResult()
        .then([]() {
          // DO SOMETHING IN THE CONTINUATION HERE
        }).attach(kj::mv(ezClient2));
  }
JonasVautherin
  • 7,297
  • 6
  • 49
  • 95