5

I can't seem to find any resources/tutorials on RSocket, other than just reading their code on GitHub, which I don't understand.

I have a file's path on my server: String serverFilePath;

I'd like to be able to download it from my client (using RSocket's Aeron implementation, preferably). Does anyone know how to do this using RSocket?

Thanks in advance.

gregwhitaker
  • 13,124
  • 7
  • 69
  • 78
James
  • 71
  • 3

2 Answers2

3

I work on RSocket, and wrote a large portion of the java version including the Aeron transport.

I wouldn't recommend using the Aeron implementation currently. There's a couple ways you can send files:

  1. Using a requestChannel to push the data to a remote server.
  2. Use requestChannel or requestStream to stream bytes to a client.

Here's an example using requestStream:

  public class FileCopy {

  public static void main(String... args) throws Exception {

    // Create a socket that receives incoming connections
    RSocketFactory.receive()
        .acceptor(
            new SocketAcceptor() {
              @Override
              // Create a new socket acceptor
              public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                return Mono.just(
                    new AbstractRSocket() {
                      @Override
                      public Flux<Payload> requestStream(Payload payload) {
                        // Get the path of the file to copy
                        String path = payload.getDataUtf8();
                        SeekableByteChannel _channel = null;

                        try {
                          _channel = Files.newByteChannel(Paths.get(path), StandardOpenOption.READ);
                        } catch (IOException e) {
                          return Flux.error(e);
                        }

                        ReferenceCountUtil.safeRelease(payload);

                        SeekableByteChannel channel = _channel;
                        // Use Flux.generate to create a publisher that returns file at 1024 bytes
                        // at a time
                        return Flux.generate(
                            sink -> {
                              try {
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                int read = channel.read(buffer);
                                buffer.flip();
                                sink.next(DefaultPayload.create(buffer));

                                if (read == -1) {
                                  channel.close();
                                  sink.complete();
                                }
                              } catch (Throwable t) {
                                sink.error(t);
                              }
                            });
                      }
                    });
              }
            })
        .transport(TcpServerTransport.create(9090))
        .start()
        .subscribe();

    String path = args[0];
    String dest = args[1];

    // Connect to a server
    RSocket client =
        RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();

    File f = new File(dest);
    f.createNewFile();

    // Open a channel to a new file
    SeekableByteChannel channel =
        Files.newByteChannel(f.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);

    // Request a stream of bytes
    client
        .requestStream(DefaultPayload.create(path))
        .doOnNext(
            payload -> {
              try {
                // Write the bytes received to the new file
                ByteBuffer data = payload.getData();
                channel.write(data);

                // Release the payload
                ReferenceCountUtil.safeRelease(payload);
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
            })
        // Block until all the bytes are received
        .blockLast();

    // Close the file you're writing too
    channel.close();
  }
}
gregwhitaker
  • 13,124
  • 7
  • 69
  • 78
Robert Roeser
  • 221
  • 1
  • 2
1

There is now a resumable file transfer example here

https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5

Yuri Schimke
  • 12,435
  • 3
  • 35
  • 69