12

I need a Java function that returns the results of a SQL SELECT query as an InputStream parameter for another system that sends the result over a network.

However, the InputStream must be of a String with custom delimiters (i.e. often, but not always, CSV).

While I can easily create a function to retrieve the result, create a delimited String, and finally convert that String to an InputStream, the SQL result will often be much too large to process in memory. Also, processing the entire result set before returning the result will incur an unwanted wait time.

How can I return an InputStream to iterate over the SQL result and send the processed (delimited) data as it is returned from the database?

Cameron S
  • 2,251
  • 16
  • 18
  • have you looked into using jdbc cached row set? That could be helpful to what you're trying to do. http://docs.oracle.com/javase/1.5.0/docs/api/javax/sql/rowset/CachedRowSet.html – ChadNC Jun 26 '12 at 14:43
  • 1
    No, but how would that help me? The issue isn't leaving the connection open, but having the results in memory. – Cameron S Jun 26 '12 at 16:58
  • that's what a cached rowset is. provides an easier way to send the results of a query over a network to other devices,applications,etc – ChadNC Jun 26 '12 at 19:39
  • The entire result needs to be sent in one `InputStream`. Can you show a short example/the methods that would make this work? – Cameron S Jun 26 '12 at 19:45

3 Answers3

9

Posting (not tested) code snippet, which should give you basic idea:

/**
 * Implementors of this interface should only convert current row to byte array and return it.
 * 
 * @author yura
 */
public interface RowToByteArrayConverter {
    byte[] rowToByteArray(ResultSet resultSet);
}

public class ResultSetAsInputStream extends InputStream {

    private final RowToByteArrayConverter converter;
    private final PreparedStatement statement;
    private final ResultSet resultSet;

    private byte[] buffer;
    private int position;

    public ResultSetAsInputStream(final RowToByteArrayConverter converter, final Connection connection, final String sql, final Object... parameters) throws SQLException {
        this.converter = converter;
        statement = createStatement(connection, sql, parameters);
        resultSet = statement.executeQuery();
    }

    private static PreparedStatement createStatement(final Connection connection, final String sql, final Object[] parameters) {
        // PreparedStatement should be created here from passed connection, sql and parameters
        return null;
    }

    @Override
    public int read() throws IOException {
        try {
            if(buffer == null) {
                // first call of read method
                if(!resultSet.next()) {
                    return -1; // no rows - empty input stream
                } else {
                    buffer = converter.rowToByteArray(resultSet);
                    position = 0;
                    return buffer[position++] & (0xff);
                }
            } else {
                // not first call of read method
                if(position < buffer.length) {
                    // buffer already has some data in, which hasn't been read yet - returning it
                    return buffer[position++] & (0xff);
                } else {
                    // all data from buffer was read - checking whether there is next row and re-filling buffer
                    if(!resultSet.next()) {
                        return -1; // the buffer was read to the end and there is no rows - end of input stream
                    } else {
                        // there is next row - converting it to byte array and re-filling buffer
                        buffer = converter.rowToByteArray(resultSet);
                        position = 0;
                        return buffer[position++] & (0xff);
                    }
                }
            }
        } catch(final SQLException ex) {
            throw new IOException(ex);
        }
    }



    @Override
    public void close() throws IOException {
        try {
            statement.close();
        } catch(final SQLException ex) {
            throw new IOException(ex);
        }
    }
}

This is very straight-forward implementation and it can be improved in following ways:

  • code duplication between if and else in read method can be removed - it was posted just for clarification
  • instead of re-creating byte array buffer for each row (new byte[] is costly operation), more sophisticated logic can be implemented to use byte array buffer which is initialised only once and then re-filled. One then should change RowToByteArrayConverter.rowToByteArray method's signature to int fillByteArrayFromRow(ResultSet rs, byte[] array) which should return number of bytes filled and fill passed byte array.

Because byte array contains signed bytes it can contain -1 (which is actually 255 as unsigned byte) and thus indicate incorrect end of stream, so & (0xff) is used to convert signed byte to unsigned bytes as integer values. For details refer to How does Java convert int into byte?.

Please also note that if network transfer speed is slow, this may keep open result sets for a long time, thus posing problems for the database.

Hope this helps ...

Community
  • 1
  • 1
Yuriy Nakonechnyy
  • 3,742
  • 4
  • 29
  • 41
2

I would improve the answer suggested by @Yura, by introducing the following:
Use DataOutputStream that is initialized with a ByteArrayOutputStream in order to conveniently write data to the byte array, inside an implementation of RowToByteArrayConverter.
In fact, I would suggest to have a hierarchy of converters , all of them extend the same abstract class (this is a code snippet of my idea - might not compile from first time)

public abstract class RowToByteArrayConverter {
  public byte[] rowToByteArray(ResultSet resultSet) {
      parseResultSet(dataOutputStream, resultSet);
      return byteArrayOutputSteam.toByteArray();
  }

  public RowToByteArrayConverter() {
    dataOutputStream = new DataOutputStream(byteArrayOutputStream);
  }

  protected DataOutputStream dataOutputStream;
  protected ByteArrayOutputStream byteArrayOutputStream;

  protected abstract void parseResultSet(DataOutputStream dataOutputStresm, ResultSet rs); 
}

Now, you can override this class by simply overriding the parseResultSet method,
for example - write code that gets as String a name from a column "name" in the record. and performs writeUTF8 on the DataOputputStream.

Yuriy Nakonechnyy
  • 3,742
  • 4
  • 29
  • 41
Yair Zaslavsky
  • 4,091
  • 4
  • 20
  • 27
2

The above answers provide a useful solution to the problem of a limited size stringbuilder being exceeded. They are also memory efficient. However, my testing suggests that they are slower than just writing data into a stringbuilder, and calling

new ByteArrayInputStream(data.getBytes("UTF-8"))

to get an inputstream.

What I found to be far more performant is to slice the incoming data by using a partition function and then using multiple threads to each:

  1. query the source database for a subset of the data
  2. Write the data to the target

This also avoids the issue where the total data can exceed the max size of a string buffer.

For example I have 6m records with a column called "RecordDate" in a SQL Server table. Values in Recorddate vary between 2013 and 2016. So I configure each thread to each request the data for 2013,14,15,16 respectively. Then each thread writes the transcoded data onto a StringBuilder and each bulk load to the target by converting to an Inputstream using getBytes() as above.

This resulted in a 2x speed up.

Why? Because the source and target databases can handle multiple concurrent requests, and so the overall workload is spread over multiple threads in all three processs: Source database, transcoder, target database.

ThatDataGuy
  • 1,969
  • 2
  • 17
  • 43