3

Lets say I defined following Thrift service

service FileResource {      
binary get_file(1:string file_name)
}

Here is the generated implementation which I cannot understand

public ByteBuffer recv_get_file() throws org.apache.thrift.TException
{
  org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
  if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
    org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
    iprot_.readMessageEnd();
    throw x;
  }
  if (msg.seqid != seqid_) {
    throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "get_file failed: out of sequence response");
  }
  get_file_result result = new get_file_result();
  result.read(iprot_);
  iprot_.readMessageEnd();
  if (result.isSetSuccess()) {
    return result.success;
  }
  throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "get_file failed: unknown result");
}

How works the string

 result.read(iprot_);

? Is it synchronous or asynchronous? How it will work for large data (several megabytes and more)? And what I need to read those data? Unfortunately I'm not used to work with java.nio and ByteBuffer. Any examples or guides would be nice.

alehro
  • 2,198
  • 2
  • 25
  • 41
  • 1
    Probably the only solution is to send data through small chunks. – alehro Aug 10 '11 at 22:07
  • totally agree. See the equivalent approach in a similar question but in C#: http://stackoverflow.com/a/16170557/435605. Also, see the following discussion on why not support Input/Output stream, which would have given you what you want: https://issues.apache.org/jira/browse/THRIFT-1948 – AlikElzin-kilaka Jan 05 '15 at 06:53

2 Answers2

3

I think you misunderstood what Apache Thrift is for. If it was this complicated, Java NIO would be easier...

How it will work for large data (several megabytes and more)?

Thrift should care of transporting that data for you. How's the performance? This will highly depend on your hardware and the quality of the network. Thrift has a pretty good performance.

And what I need to read those data?

In your Java Thrift client, you can do

TTransport transport;
transport = new TSocket("yourServerHostNameOrIPAddress", serverPort);
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
ChunkFileResourceThrift.Client client = new ChunkFileResourceThrift.Client(protocol);
ByteBuffer buffer = client.get_file(yourFileName);
// Do whatever you want with the byte buffer
transport.close();

Is it synchronous or asynchronous?

If you defined it as oneway in the .thrift file, it's asynchronous, otherwise it is synchronous. Thus in your case it is synchronous.

Having to implement network low-level details totally beats the purpose of using Thrift. Thrift is precisely used so you can forget about this details.

Community
  • 1
  • 1
m0skit0
  • 25,268
  • 11
  • 79
  • 127
  • I think @alehro asked about large data. The returned ByteBuffer isn't an InputStream and is finite. If the data is large, this will eventually cause a memory issue (OOME). – AlikElzin-kilaka Jan 05 '15 at 06:50
  • @AlikElzin-kilaka Yes, I see now, thanks. I too had this issue in Android devices (where total heap cannot exceed 60 MiB). In this case I solved this by limiting the size of transferred data and adding a flag on the Thrift interface to indicate this data is incomplete and you should download the rest. – m0skit0 Jan 05 '15 at 15:10
-4

Finally I succeeded in transferring file from server to client. I extended Client and Processor classes auto-generated by Thrift. It gave me access to TProtocol object. Which in turn allows to send/receive arbitrary data streams.
I'm sure my solution is very rough. It would be nice if someone pointed to me how to implement it in conformity with Thrift architecture. Could it be accomplished better by implementing custom Thrift protocol?

client:

package alehro.droid;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

import alehro.log.Logger;
import alehro.tcp.ChunkFileResourceThrift;
import alehro.tcp.ServerSideError;

class ThriftClientExt extends ChunkFileResourceThrift.Client {
public ThriftClientExt(TProtocol prot) {
    super(prot);

}

public void recv_get_file_ext(String get_file_out_path) throws TException,
        IOException, ServerSideError {
    FileOutputStream fos = new FileOutputStream(get_file_out_path);
    FileChannel channel = fos.getChannel();
    int size = 0;
    // -1 - end of file, -2 exception.
    while ((size = iprot_.readI32()) > 0) {
        Logger.me.v("receiving buffer size=" + size);
        ByteBuffer out = iprot_.readBinary();
        // out.flip();
        channel.write(out);
    }
    if (size == -2) {
        String msg = iprot_.readString();
        Logger.me.e("Server error: " + msg);
        // TODO: report error to user
    }
    channel.close();
    recv_get_file();
}

}

server:

package alehro.tcp;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

import alehro.log.Logger;
import alehro.tcp.ChunkFileResourceThrift.Iface;
import alehro.tcp.ChunkFileResourceThrift.get_file_args;
import alehro.tcp.ChunkFileResourceThrift.get_file_result;

public class ChunkedFileResourceProcessor extends
    ChunkFileResourceThrift.Processor {

public interface IfaceExt extends Iface {
    void get_file_raw(String key, String file_name, TProtocol out)
            throws TException, ServerSideError;
}

final private IfaceExt iface_1;


public ChunkedFileResourceProcessor(IfaceExt iface) {
    super(iface);
    iface_1 = iface;
    // replace generated implementation by my custom one.
    processMap_.put("get_file", new get_file_raw());
}

private class get_file_raw implements ProcessFunction {

    @Override
    public void process(int seqid, TProtocol iprot, TProtocol oprot)
            throws TException {
        get_file_args args = new get_file_args();
        try {
            args.read(iprot);
        } catch (org.apache.thrift.protocol.TProtocolException e) {
            iprot.readMessageEnd();
            org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(
                    org.apache.thrift.TApplicationException.PROTOCOL_ERROR,
                    e.getMessage());
            oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                    "get_file",
                    org.apache.thrift.protocol.TMessageType.EXCEPTION,
                    seqid));
            x.write(oprot);
            oprot.writeMessageEnd();
            oprot.getTransport().flush();
            return;
        }
        iprot.readMessageEnd();
        get_file_result result = new get_file_result();
        try {
            iface_1.get_file_raw(args.key, args.file_name, oprot);
        } catch (ServerSideError ouch) {
            result.ouch = ouch;
        } catch (Throwable th) {
            Logger.me.e("Internal error processing get_file_raw");
            Logger.me.e(th.getMessage());
            Logger.me.e(th);
            org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(
                    org.apache.thrift.TApplicationException.INTERNAL_ERROR,
                    "Internal error processing get_file");
            oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                    "get_file",
                    org.apache.thrift.protocol.TMessageType.EXCEPTION,
                    seqid));
            x.write(oprot);
            oprot.writeMessageEnd();
            oprot.getTransport().flush();
            return;
        }
        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                "get_file", org.apache.thrift.protocol.TMessageType.REPLY,
                seqid));
        result.write(oprot);
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
    }

}

}

server handler:

public class ChunkedFileResourceHandler implements
    ChunkedFileResourceProcessor.IfaceExt {
....
@Override
public void get_file(String key, String file_name) throws TException {
    // stub
    throw new TException("Wrong call. Use get_file_raw instead.");
}

@Override
public void get_file_raw(String key, String file_name, final TProtocol out)
        throws ServerSideError, TException {
    // catch all here. mimic original get_file throw politics.
    try {
        Logger.me.v("Begin get_file_raw");
        UserSession se = accessUserSession(key, "get", 0, 0);
        vali(se != null);
        synchronized (se) {
            String fullPath = "";
            Logger.me.i("get file start: " + file_name);
            String userDir = AppConfig.getUserDir(se.info.email);
            fullPath = userDir + file_name;

            final FileInputStream inputFile;
            ByteBuffer buffer = null;
            int bytesRead = -1;
            FileChannel fileChannel = null;

            inputFile = new FileInputStream(fullPath);
            fileChannel = inputFile.getChannel();
            buffer = ByteBuffer.allocate(2048);
            bytesRead = fileChannel.read(buffer);

            // Logger.me.v("start sending file");
            while (bytesRead != -1) {
                buffer.flip();
                int length = buffer.limit() - buffer.position()
                        - buffer.arrayOffset();
                Logger.me.v("sending buffer length=" + length);

                out.writeI32(length); // read it in client
                out.writeBinary(buffer); // read it in client
                buffer.clear();

                bytesRead = fileChannel.read(buffer);

            }
            out.writeI32(-1); // read it in client

            Logger.me.i("get file end.");
        }
    } catch (TException e) {
        throw e;
    } catch (Throwable e) {
        write_get_file_exception(file_name, e, out);
        return;
    }

}

void write_get_file_exception(String file, Throwable e, final TProtocol out)
        throws TException {
    out.writeI32(-2);
    out.writeString("Exception in get_file_raw: file=" + file
            + "description=" + e.getMessage());
    Logger.me.e(e);
    Logger.me.i("get file ended wtih errors: " + e.getMessage());
}
}
alehro
  • 2,198
  • 2
  • 25
  • 41
  • 1
    As I explained in my answer, this is **not** how you should use Thrift. You don't need to do extend those classes. You're just complicating it and making your code not extendable. What if you change your .thrift interface? You have to re-write the class extension? – m0skit0 Sep 20 '13 at 16:46