I want to call grpc server by InputStream of MethodDescriptor in client, but, I'm not success. This is my code:
envjdk 1.8
grpc 1.33.1
the version of grpc-all dependency is:
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.33.1</version>
<scope>provided</scope>
</dependency>
In Server , My EchoServiceImpl extends EchoServiceGrpc.EchoServiceImplBase, override echo rpc method . I export inputstream service before build server.
Server// 1. implements service
@Service
public class EchoServiceImpl extends EchoServiceGrpc.EchoServiceImplBase {
@Override
public void echo(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
System.out.println("Received: " + request.getMessage());
System.out.println("Received: " + request.getMessage());
EchoResponse.Builder response = EchoResponse.newBuilder()
.setMessage("ReceivedHELLO");
responseObserver.onNext(response.build());
responseObserver.onCompleted();
}
}
// 2. export inputstream Service
private void exportService(final Object bean) {
BindableService bindableService = (BindableService) bean;
ServerServiceDefinition serviceDefinition = bindableService.bindService();
try {
ServerServiceDefinition isDefinition = ServerInterceptors.useInputStreamMessages(serviceDefinition);//inputstream
serviceDefinitions.add(serviceDefinition);
serviceDefinitions.add(isDefinition);
} catch (Exception e) {
log.error("export service is fail", e);
}
}
// 3. start Grpc Server
private void startGrpcServer() {
ServerBuilder<?> serverBuilder = grpcServerBuilder.buildServerBuilder();
List<ServerServiceDefinition> serviceDefinitions = grpcClientBeanPostProcessor.getServiceDefinitions();
for (ServerServiceDefinition serviceDefinition : serviceDefinitions) {
serverBuilder.addService(serviceDefinition);
}
try {
Server server = serverBuilder.build().start();
log.info("Grpc server started successfully");
} catch (IOException e) {
log.error("Grpc server failed to start", e);
}
}
In client, I create InputStream MethodDescriptor, and the RequestMarshaller is the inputstram ,but I'm not success util now. who can supply some advice ? thanks~~~
Clientpublic class ClientDemo {
private static Map<String,Object> methodDescriptorCache = Maps.newHashMap();
// InputStream marshaller
final static MethodDescriptor.Marshaller<InputStream> marshaller = new MethodDescriptor.Marshaller<InputStream>() {
@Override
public InputStream stream(final InputStream value) {
return value;
}
@Override
public InputStream parse(final InputStream stream) {
if (stream.markSupported()) {
return stream;
} else {
return new BufferedInputStream(stream);
}
}
};
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.build channel
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
//2. create InputStream MethodDescriptor
MethodDescriptor<InputStream, InputStream> echo = createInputStreamMethodDescriptor("echo.EchoService", "echo");
//3. channel.newCall()
ClientCall<InputStream, InputStream> call =
channel.newCall(echo, CallOptions.DEFAULT);
//4. input params
String req = "{\"message\":\"input stream\"}";
InputStream request = new ByteArrayInputStream(req.getBytes(StandardCharsets.UTF_8));
System.out.println(request);
//5. get result
ListenableFuture<InputStream> res = ClientCalls.futureUnaryCall(call, request);
System.out.println(res.get());
}
public static io.grpc.MethodDescriptor<InputStream, InputStream> createInputStreamMethodDescriptor(String clazzName, String methodName) {
io.grpc.MethodDescriptor<InputStream, InputStream> methodDescriptor = (MethodDescriptor<InputStream, InputStream>) methodDescriptorCache
.get(clazzName + methodName);
if (methodDescriptor != null)
return methodDescriptor;
else {
methodDescriptor = io.grpc.MethodDescriptor.<InputStream, InputStream> newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)//
.setFullMethodName(io.grpc.MethodDescriptor.generateFullMethodName(clazzName, methodName))//
.setRequestMarshaller(marshaller)//
.setResponseMarshaller(marshaller)//
.setSafe(false)//
.setIdempotent(false)//
.build();
methodDescriptorCache.put(clazzName + methodName, methodDescriptor);
return methodDescriptor;
}
}
}
exception message
[grpc-nio-worker-ELG-1-2] DEBUG io.grpc.netty.NettyClientHandler - [id: 0xa764352c, L:/127.0.0.1:55783 - R:localhost/127.0.0.1:8080] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-status: 2] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
Exception in thread "main" java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNKNOWN
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:564)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:545)
at org.apache.shenyu.examples.grpc.test.ClientDemo2.main(ClientDemo2.java:56)
Caused by: io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:533)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464)
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428)
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 1