6

I would like to consume a database cursor using a Java Stream. I would prefer the Java stream to fetch and process the rows as they are needed, and avoid loading all 5 million rows in memory first, and then processing them later.

Is it possible to consume it without loading the whole table in RAM?

So far my code looks like:

Cursor<Product> products = DAO.selectCursor(...);

// 1. Initialize variables
long count = 0;
...
for (Iterator<Product> it = products.iterator(); it.hasNext();) {
  Product p = it.next();
  // 2. Processing each row
  ...
}
// 3. Concluding (processing totals, stats, etc.)
double avg = total / count;
...

It does work well, but it's a bit cumbersome and I would like to take advantage of the Stream API.

Joe DiNottra
  • 836
  • 8
  • 26
  • Just use `products.stream()`. What happens next depends on what operation(s) you want to perform. – MC Emperor Oct 16 '20 at 13:30
  • 1
    Just use `for(Product p: DAO.selectCursor(...)) { … }` Since Java 5… – Holger Oct 16 '20 at 13:38
  • @MCEmperor "What happens next depends on..." -- ah... that's what I'm afraid of. I wanted to make sure it does not bring the whole server down. Is there maybe a way of enforcing the stream to actually "stream the data"? – Joe DiNottra Oct 16 '20 at 13:41
  • Does this help? https://stackoverflow.com/questions/32209248/java-util-stream-with-resultset – implosivesilence Oct 16 '20 at 13:45
  • @implosivesilence Thanks, unfortunately that question resolves the retrieval of the cursor; this works well for me. It's the processing of the cursor where I'm confused. – Joe DiNottra Oct 16 '20 at 13:50
  • @JoeDiNottra There is, but you need to call a *terminal operation* on the stream. That *could* be `.collect(Collectors.toList())` – and that would mess up your system, of course. But your actual requirement is not specified. – MC Emperor Oct 16 '20 at 14:03

1 Answers1

4

First, we must discuss how you are going to get the data from your database. If your intent is to go over a large number of records, and you don't want to load them all at once in memory, you have two options:

  1. Paginate the results.
  2. Make your driver paginate the results.

If you already have an iterator based on a Cursor that retrieves paginated data as needed, then you can use Spliterators and StreamSupport utility classes from JDK API to turn it into a Stream.

Stream<Product> products = StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(cursor.iterator(),
                        Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE), false)

Otherwise you will have to build something of your own.

Driver Pagination

If you JDBC driver supports the fetch size property you can do something like this:

Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();

At this point, rs contains the first fetch of 1000 records, and it won't retrieve more from the database until you have read the previous page.

The tricky part on all this is that you cannot close any of the resources (i.e. connection, prepared statement and resultset) until you are done reading all the records, and since the stream we want to build is lazy by default, this implies we have to keep all these resources opened until we are done with the stream.

Perhaps the easiest way is to build an Iterator around this logic, and when the iterator actually reaches the end of all the data, then you can close all the resources (i.e. !rs.next()) or another alternative is to do all the work when the stream is closed (Stream.onClose()).

Once we have an iterator, is very simple to build a stream out of it using Spliterators and StreamSupport utility classes from the JDK API.

My rudimentary implementation would look somewhat like this. This is just for illustration purposes. You may want to give it some more love to your particular case.

public Stream<String> getUsers() {
    DataSource ds = jdbcTemplate.getDataSource();
    try {
        Connection conn = ds.getConnection();
        conn.setAutoCommit(false);
        PreparedStatement stm = conn.prepareStatement("SELECT id FROM users", ResultSet.TYPE_FORWARD_ONLY);
        //fetch size is what guarantees only 1000 records at the time
        stm.setFetchSize(1000);
        ResultSet rs = stm.executeQuery();

        Iterator<String> sqlIter = new Iterator<>() {
            @Override
            public boolean hasNext() {
                try {
                    return rs.next();
                } catch (SQLException e) {
                    closeResources(conn, stm, rs);
                    throw new RuntimeException("Failed to read record from ResultSet", e);
                }
            }

            @Override
            public String next() {
                try {
                    return rs.getString("id");
                } catch (SQLException e) {
                    closeResources(conn, stm, rs);
                    throw new RuntimeException("Failed to read record from ResultSet", e);
                }
            }
        };

        //turn iterator into a stream
        return StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(sqlIter,
                        Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE), false
        ).onClose(() -> {
            //make sure to close resources when done with the stream
            closeResources(conn, stm, rs);
        });


    } catch (SQLException e) {
        logger.error("Failed to process data", e);
        throw new RuntimeException(e);
    }
}

private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) {
    try (conn; ps; rs) {
        logger.info("Resources successfully closed");
    } catch (SQLException e) {
        logger.warn("Failed to properly close database sources", e);
    }
}

The key point here is to notice that the stream we return is supposed to run some onClose logic, so when we use stream must make sure we do a stream.close() when we are done with it to ensure we close all resources kept alive up to this point (i.e. conn,stm and rs).

The best way perhaps is to use a try-with-resources such that the try will take care of closing the stream.

try(Stream<String> users = userRepo.getUsers()){
    //print users to the main output retrieving 1K at the time
    users.forEach(System.out::println);
}

Manual Pagination

An alternative is that you paginate the results yourself, this depends on the database but using select clauses like limit and offset you can request a specific page of records, process them and then retrieve some more.

select id from users LIMIT 1000 OFFSET 5

In this case, your iterator would consume all the page and when done, request the next page, until no more records are found in the final page.

The advantage of this another approach is that resources can be immediately controlled in the iterator itself.

I won't develop an example of this and leave it for you to try.

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
Edwin Dalorzo
  • 76,803
  • 25
  • 144
  • 205
  • Thank you very much for the response. In this case the application is using Oralce with MyBatis cursors, and they work well with the buffering; this part of the logic is resolved without Streams and it barely consumes ram. The problem I have is about Java Streams: will they materialize the whole Cursor in ram? – Joe DiNottra Oct 16 '20 at 14:44
  • 1
    That depends on how you use the stream, e.g. if you use a collector that puts everything in ram, of course it will, but if when you consume the stream, you process the record, do whatever you want to do with it and then just move on to the next one, then the stream won't keep anything in ram. As you go, objects will be garbage collectible. – Edwin Dalorzo Oct 16 '20 at 14:47
  • 1
    If you already have an iterator based on a `Cursor` that retrieves paginated data as needed, then you can still use my example of `Spliterator` and `StreamSupport` to turn it into a `Stream` – Edwin Dalorzo Oct 16 '20 at 14:48