I have a sample protobuf project in java that sends a protobuf request message to a server and gets the response using a netty channel handler, something along the lines of:
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
this.msgFactory = new OA2ProtoMessageFactory();
this.protoChannelMessageDecoder = new ProtoMessageToChannelMessageDecoder(msgFactory);
this.protoChannelMessageEncoder = new ChannelMessageToProtoMessageEncoder(msgFactory);
this.protoMessageReceiverHandler = new ProtoMessageReceiverHandler(msgFactory);
connect();
}
public void connect() {
workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
initPipelineForChannel(ch);
}
});
// Start the client.
channelFuture = b.connect(host, port).sync();
} catch (Exception ex) {
closeConnection();
}
}
private void initPipelineForChannel(Channel ch) throws SSLException {
ChannelPipeline pipeline = ch.pipeline();
SslEngineFactory sslEngineFactory = new ClientSslEngineFactory();
pipeline.addLast("ssl", sslEngineFactory.newHandler(ch));
pipeline.addLast("idleState", new IdleStateHandler(INACTIVITY_READ_MILLIS,
PING_INTERVAL_MILLIS, 0, TimeUnit.MILLISECONDS));
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, LENGTH_FIELD_LENGTH, 0,
LENGTH_FIELD_LENGTH));
pipeline.addLast("protobufDecoder", protobufDecoder);
pipeline.addLast("protoChannelMessageDecoder", protoChannelMessageDecoder);
pipeline.addLast("lengthFieldPrepender", lengthFieldPrepender);
pipeline.addLast("protobufEncoder", protobufEncoder);
pipeline.addLast("protoChannelMessageEncoder", protoChannelMessageEncoder);
pipeline.addLast("heartbeatOnIdle", HeartbeatOnIdleHandler.DEFAULT);
pipeline.addLast(ProtoMessageReceiverHandler.NAME, protoMessageReceiverHandler);
pipeline.addLast("closeOnException", CloseOnExceptionHandler.DEFAULT);
}
So this basically creates a netty Bootstrap and adds a pipeline when creating a channel, if I understood it correctly.
How can I set this pipeline in Akka grpc (decoders and etc)?