First, we must discuss how you are going to get the data from your database. If your intent is to go over a large number of records, and you don't want to load them all at once in memory, you have two options:
- Paginate the results.
- Make your driver paginate the results.
If you already have an iterator based on a Cursor
that retrieves paginated data as needed, then you can use Spliterators
and StreamSupport
utility classes from JDK API to turn it into a Stream
.
Stream<Product> products = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(cursor.iterator(),
Spliterator.NONNULL |
Spliterator.ORDERED |
Spliterator.IMMUTABLE), false)
Otherwise you will have to build something of your own.
Driver Pagination
If you JDBC driver supports the fetch size property you can do something like this:
Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
At this point, rs
contains the first fetch of 1000 records, and it won't retrieve more from the database until you have read the previous page.
The tricky part on all this is that you cannot close any of the resources (i.e. connection, prepared statement and resultset) until you are done reading all the records, and since the stream we want to build is lazy by default, this implies we have to keep all these resources opened until we are done with the stream.
Perhaps the easiest way is to build an Iterator around this logic, and when the iterator actually reaches the end of all the data, then you can close all the resources (i.e. !rs.next()
) or another alternative is to do all the work when the stream is closed (Stream.onClose()
).
Once we have an iterator, is very simple to build a stream out of it using Spliterators
and StreamSupport
utility classes from the JDK API.
My rudimentary implementation would look somewhat like this. This is just for illustration purposes. You may want to give it some more love to your particular case.
public Stream<String> getUsers() {
DataSource ds = jdbcTemplate.getDataSource();
try {
Connection conn = ds.getConnection();
conn.setAutoCommit(false);
PreparedStatement stm = conn.prepareStatement("SELECT id FROM users", ResultSet.TYPE_FORWARD_ONLY);
//fetch size is what guarantees only 1000 records at the time
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
Iterator<String> sqlIter = new Iterator<>() {
@Override
public boolean hasNext() {
try {
return rs.next();
} catch (SQLException e) {
closeResources(conn, stm, rs);
throw new RuntimeException("Failed to read record from ResultSet", e);
}
}
@Override
public String next() {
try {
return rs.getString("id");
} catch (SQLException e) {
closeResources(conn, stm, rs);
throw new RuntimeException("Failed to read record from ResultSet", e);
}
}
};
//turn iterator into a stream
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(sqlIter,
Spliterator.NONNULL |
Spliterator.ORDERED |
Spliterator.IMMUTABLE), false
).onClose(() -> {
//make sure to close resources when done with the stream
closeResources(conn, stm, rs);
});
} catch (SQLException e) {
logger.error("Failed to process data", e);
throw new RuntimeException(e);
}
}
private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) {
try (conn; ps; rs) {
logger.info("Resources successfully closed");
} catch (SQLException e) {
logger.warn("Failed to properly close database sources", e);
}
}
The key point here is to notice that the stream we return is supposed to run some onClose
logic, so when we use stream must make sure we do a stream.close()
when we are done with it to ensure we close all resources kept alive up to this point (i.e. conn
,stm
and rs
).
The best way perhaps is to use a try-with-resources such that the try will take care of closing the stream.
try(Stream<String> users = userRepo.getUsers()){
//print users to the main output retrieving 1K at the time
users.forEach(System.out::println);
}
Manual Pagination
An alternative is that you paginate the results yourself, this depends on the database but using select clauses like limit and offset you can request a specific page of records, process them and then retrieve some more.
select id from users LIMIT 1000 OFFSET 5
In this case, your iterator would consume all the page and when done, request the next page, until no more records are found in the final page.
The advantage of this another approach is that resources can be immediately controlled in the iterator itself.
I won't develop an example of this and leave it for you to try.