1

I have a sample protobuf project in java that sends a protobuf request message to a server and gets the response using a netty channel handler, something along the lines of:

 public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
        this.msgFactory = new OA2ProtoMessageFactory();
        this.protoChannelMessageDecoder = new ProtoMessageToChannelMessageDecoder(msgFactory);
        this.protoChannelMessageEncoder = new ChannelMessageToProtoMessageEncoder(msgFactory);
        this.protoMessageReceiverHandler = new ProtoMessageReceiverHandler(msgFactory);
        connect();
    }

    public void connect() {
        workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    initPipelineForChannel(ch);
                }
            });
            // Start the client.
            channelFuture = b.connect(host, port).sync();

        } catch (Exception ex) {
            closeConnection();
        }
    }

    private void initPipelineForChannel(Channel ch) throws SSLException {
        ChannelPipeline pipeline = ch.pipeline();
        SslEngineFactory sslEngineFactory = new ClientSslEngineFactory();
        pipeline.addLast("ssl", sslEngineFactory.newHandler(ch));

        pipeline.addLast("idleState", new IdleStateHandler(INACTIVITY_READ_MILLIS,
                PING_INTERVAL_MILLIS, 0, TimeUnit.MILLISECONDS));

        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, LENGTH_FIELD_LENGTH, 0,
                LENGTH_FIELD_LENGTH));
        pipeline.addLast("protobufDecoder", protobufDecoder);
        pipeline.addLast("protoChannelMessageDecoder", protoChannelMessageDecoder);
        pipeline.addLast("lengthFieldPrepender", lengthFieldPrepender);
        pipeline.addLast("protobufEncoder", protobufEncoder);
        pipeline.addLast("protoChannelMessageEncoder", protoChannelMessageEncoder);
        pipeline.addLast("heartbeatOnIdle", HeartbeatOnIdleHandler.DEFAULT);
        pipeline.addLast(ProtoMessageReceiverHandler.NAME, protoMessageReceiverHandler);
        pipeline.addLast("closeOnException", CloseOnExceptionHandler.DEFAULT);
    }

So this basically creates a netty Bootstrap and adds a pipeline when creating a channel, if I understood it correctly.

How can I set this pipeline in Akka grpc (decoders and etc)?

  • What do you mean by set a pipeline in akka gRPC? Do you want to send the data from Netty client through gRPC? – Felipe Jan 21 '21 at 15:46

0 Answers0