75

I have few tables with big amount of data (about 100 million records). So I can't store this data in memory but I would like to stream this result set using java.util.stream class and pass this stream to another class. I read about Stream.of and Stream.Builder operators but they are buffered streams in memory. So is there any way to resolve this question?

UPDATE #1

Okay I googled and found jooq library. I'm not sure but looks like it could be applicable to my test case. To summarize I have few tables with big amount of data. I would like to stream my resultset and transfer this stream to another method. Something like this:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }
    
    return record;
}

Could please someone advise, am I on correct way? Thanks.

UPDATE #2

I made some experiment on jooq and could say now that above decision is not suitable for me. This code record = DSL.using(connection).fetch(resultSet).stream(); takes too much time

starball
  • 20,030
  • 7
  • 43
  • 238
Iurii
  • 1,755
  • 8
  • 24
  • 38
  • 1
    Beware that not all streams are the same kind of stream. `java.util.stream.Stream` may not actually be suited for what you have in mind. – Louis Wasserman Aug 25 '15 at 16:32
  • 1
    `ResultSet` is like a stream. You can only process one row of the result at once. Or do you want to process the `ResultSet` with the streaming api? – Flown Aug 25 '15 at 16:43
  • I would like to wrap `ResultSet` to java 8 `stream` and pass this `stream` object to another class. In another class I would like to iterate over this `stream` and write the results to `File`. – Iurii Aug 25 '15 at 16:56
  • 2
    This is quite close to some of my work which I described [here](https://www.airpair.com/java/posts/spring-streams-memory-efficiency), maybe you'll find it useful. – Marko Topolnik Aug 28 '15 at 08:42
  • please use JOOQ with caution. I used JOOQ for lazy streaming which worked well until I encountered Clob types which is not supported in JOOQ and it converted to String internally which really degraded performance. There may be other issues or bugs in JOOQ – TriCore Oct 18 '21 at 05:00
  • @TriCore: Well, you folks seem to write `SELECT *` queries and are now blaming the framework for doing exactly what you told it to... Why not just write smarter queries instead? Also, the OP writes `fetch(resultSet)` which eagerly fetches everything into memory (as documented), rather than writing `fetchLazy(resultSet)`, which keeps an open cursor. It works as designed and documented... – Lukas Eder Oct 20 '21 at 18:05
  • @LukasEder not sure why would you assume `SELECT *` or if I am loading clobs inadvertently. I needed to lazily load clob in a lazy stream which would give flexibility to decide when to load clobs or not load them at all `based on some conditions`. I understand its all documented but I found it very surprising that JOOQ loaded the whole clob eagerly in a lazy stream. Anyways, I found a solution by moving the clob values to a blob store in cloud. – TriCore Oct 20 '21 at 19:33
  • @TriCore: It doesn't matter if you `SELECT *` or `SELECT a, b, clob`. The point is, why include the `clob` column in the `SELECT` clause when you could use "some conditions" to decide whether you actually need to include it in your `SELECT` clause? I'd still be very happy to offer answering a specific new question if this isn't clear... It doesn't seem to be...? – Lukas Eder Oct 20 '21 at 21:19
  • @LukasEder actually I am not clear. Are you asking me to execute two queries one with clob and one without clob? Note that I also need data from the records for which the clobs are ignored. – TriCore Oct 20 '21 at 22:27
  • @TriCore please ask a new question. It'll be much easier to explain – Lukas Eder Oct 21 '21 at 05:35
  • How to handle setFetchSize(xxx) method , if i am not sure about the size of db rows object ??? – Sudhir Gaurav Apr 06 '23 at 18:14

7 Answers7

119

The first thing you have to understand is that code like

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

does not work as by the time you leave the try blocks, the resources are closed while the processing of the Stream hasn’t even started.

The resource management construct “try with resources” works for resources used within a block scope inside a method but you are creating a factory method returning a resource. Therefore you have to ensure that the closing of the returned stream will close the resources and the caller is responsible for closing the Stream.


Further, you need a function which produces an item out of a single line from the ResultSet. Supposing, you have a method like

Record createRecord(ResultSet rs) {
    …
}

you may create a Stream<Record> basically like

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

But to do it correctly you have to incorporate the exception handling and closing of resources. You can use Stream.onClose to register an action that will be performed when the Stream gets closed, but it has to be a Runnable which can not throw checked exceptions. Similarly the tryAdvance method is not allowed to throw checked exceptions. And since we can’t simply nest try(…) blocks here, the program logic of suppression exceptions thrown in close, when there is already a pending exception, doesn’t come for free.

To help us here, we introduce a new type which can wrap closing operations which may throw checked exceptions and deliver them wrapped in an unchecked exception. By implementing AutoCloseable itself, it can utilize the try(…) construct to chain close operations safely:

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

With this, the entire operation becomes:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

This method wraps the necessary close operation for all resources, Connection, Statement and ResultSet within one instance of the utility class described above. If an exception happens during the initialization, the close operation is performed immediately and the exception is delivered to the caller. If the stream construction succeeds, the close operation is registered via onClose.

Therefore the caller has to ensure proper closing like

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

Note that also the delivery of an SQLException via RuntimeException has been added to the tryAdvance method. Therefore you may now add throws SQLException to the createRecord method without problems.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • up-vote first. I think `stream.onClose(UncheckedCloseable.wrap(resource)::close)` is more expressiveness and `RuntimeException` can be replaced with `UncheckedIOException`. what do you think? – holi-java Apr 26 '17 at 06:55
  • 1
    @holi-java: `::close` would be again referring to the method which can throw checked exceptions, hence, what you suggest would be rejected by the compiler. Besides that, I don’t think that it is necessary to make it explicit that passing a resource to `onClose` will eventually invoke its `close` method. `UncheckedIOException` is suitable for wrapping an `IOException`, but not for a general solution, especially not in this answer, where we have to deal with `SQLException`. – Holger Apr 27 '17 at 07:17
  • Hi, I agree on the point of `UncheckedIOException`, the `::close` would be again throw cehcked exceptions can be solved as `run` method and I can remove the `run` method. am I right? – holi-java Apr 27 '17 at 08:34
  • @holi-java could you may re-phrase that? I did not get your point (and I wanted to respond to your comment)... – Eugene Apr 28 '17 at 13:11
  • @Eugene Hi, what I have said to @Holger is to declare a `close` method doesn't throws `Exception`, and then delegate `close` to the original `AutoCloseable.close` like as `run` method. then I can remove the `run` method and make `UncheckedCloseable` not is a `Runnable` and then I can use the expression `stream.onClose(UncheckedCloseable.wrap(resource)::close)` . `close` method in `UncheckedAutoCloseable` are conflict with its name, because the `UncheckedAutoCloseable` still throws `Exception`. am I right? – holi-java Apr 28 '17 at 13:58
  • @holi-java first, your method would have a different name, other then `close`; because it would override the `AutoCloseable::close`, let's say `closeMine`. Something like : `default void closeMine() {... // code just like in Runnable`. Second, that would still work, but I find the Runnable approach much cleaner. It's actually a very nice piece of code, which I already took into my personal `Utils`... there are quite a few of them from Holger at this point – Eugene Apr 28 '17 at 14:22
  • @Eugene I said the most one sentence: “I'm sorry about my bad english.” and I say it again, I'm very sorry. I needn't a default method `closeMine` just decorate the original `AutoCloseable`. maybe it is not a cleaner way, but it elegant and `close` method don't conflict with it's name. – holi-java Apr 28 '17 at 14:29
  • @Eugene the implementation like this: `interface UncheckedCloseable extends AutoCloseable { void close(); static AutoCloseable wrap(AutoCloseable it) { return (UncheckedCloseable) () -> { try { it.close(); } catch (Exception e) { throw new RuntimeException(e); } }; } }` – holi-java Apr 28 '17 at 14:29
  • @Eugene Hi, there is another reason when using `stream.onClose(UncheckedCloseable.wrap(resource)::close)` in code snippet, I think it is clearer than the @Holger's, because I can see the code that is close a resource after close the stream, but like as @Holger's I need jump into the `UncheckedCloseable` and seeing what the `run` method does. can you give me some advice? – holi-java Apr 28 '17 at 14:44
  • 1
    @holi-java: the reason, why this class implements `AutoCloseable` is the `nest` method which allows to combine it with another resource that needs closing. If `UncheckedCloseable` had a `close` method that doesn’t allow checked exceptions, the implementation of `nest` would be more complicated. On the other hand, if you don’t use that method, you don’t need it to implement `AutoCloseable` at all. In fact, you wouldn’t need that type; a sole `static` wrapping method would be sufficient. – Holger Apr 28 '17 at 15:37
  • 1
    By the way, all that `UncheckedCloseable.wrap(resource)::close` tells you, is that the function will call `close` on whatever `wrap` returns. It doesn’t tell you what happens to `resource`. On the other hand, once you understood that `resource` also has a `close` method and that, of course, that `close` method will eventually be called, well, then you don’t need `::close` on the result of `wrap` to hint at that fact. As said, that `close` method is entirely unrelated to the `close` method of `resource`. It wouldn’t be different had you written `UncheckedCloseable.wrap(resource)::run` – Holger Apr 28 '17 at 15:39
  • @Holger Hi, A few days gone, How are you? "you don’t need it to implement AutoCloseable at all" - I still need an `UncheckedCloseable` due to `AutoCloseable.close` throws an `Exception` but `Runnable` not does. and the `nest` method I can done like this: `try {wrap(c).close();} finally {close();}`. Am I right, sir? – holi-java Apr 28 '17 at 15:59
  • @Holger by the `UncheckedCloseable.wrap(resource)::run` way I still need to see what the `run` methods does, but in the `UncheckedCloseable.wrap(resource)::close` way I can see it call the `close` method due to the `wrap` method has been became a pattern/convention in developers mind. What do you think? – holi-java Apr 28 '17 at 16:07
  • 1
    @holi-java: if you let `UncheckedCloseable` declare a `close` method that doesn’t throw checked exceptions and pass a method reference to it to `onClose`, there is no relevance in implementing `AutoCloseable` anymore. No one is expecting an `AutoCloseable` anywhere. Regarding `finally`, just read why “try with resource” was introduced, simply said, no `finally` is not an option. Oh, and I don’t get why you have to look up what `run` does but assume to know what `close` does due to a “pattern/convention”. All other developers already understand what `UncheckedCloseable.wrap(resource)` does… – Holger Apr 28 '17 at 16:49
  • @Holger yes, you are right. `try-with-resource` block has different behavior between `try-finally` block when fails both on closes a resource & fails in `try` block. when I see your code at the first time of `UncheckedCloseable` I really need to look up the `run` method, but I needn't to look up it repeatly at the next time. great and you finally convinced me, thank you very much. – holi-java Apr 28 '17 at 17:14
  • 1
    I have made a simple library to do just this. It's designed to be thread-safe (streams are parallel) and you can even let the resources be cleaned up automatically. Requires Java 10. https://github.com/claudemartin/streamed-sql – Claude Martin Jul 29 '18 at 17:48
  • 2
    @ClaudeMartin you don’t need to insert a `synchronized` in the `tryAdvance` method. The code is already usable with parallel streams as is. It’s the contract of the `Spliterator` interface that a single instance is never accessed concurrently, just like with an `Iterator`. The key point for parallel processing is that a new `Spliterator` instance is created via `trySplit`, to be processed by other threads. Since a single `ResultSet` can’t be split, you’re fine with the inherited `trySplit` implementation which will buffer some element into an array and return an array spliterator for them. – Holger Jul 30 '18 at 12:03
  • But what about visibility? I don't think jdbc is thread safe. I want every thread to see whatever the last `rs.next()` did. I could override trySplit so that only one synch-block is needed for a batch. – Claude Martin Aug 03 '18 at 13:12
  • 1
    @ClaudeMartin the `ResultSet` is never seen by different threads. The other threads will see what the mapping function has returned for a particular row. These objects are already published in a thread safe way by the Stream implementation. This assumes that objects created for different rows do not share mutable state, but that’s always required, i.e. `synchronized` wouldn’t help if that’s violated. – Holger Aug 03 '18 at 13:37
  • Yes, you are right. It's all local. I can just remove it. I'll add to the documentation of ResultSetMapper that the ResultSet must not be shared with other code. – Claude Martin Aug 03 '18 at 21:15
  • Could this be made simpler by having the resultset being autoclosed by a try-with-resources and using lambdas for the consumer action? – Thorbjørn Ravn Andersen Dec 25 '18 at 22:57
  • 2
    @ThorbjørnRavnAndersen yes, but this would be a different code structure than a `Stream` returning method. Both variants exist, e.g. this answer is like, e.g. `Files.lines​(…)`, works, whereas your suggestion is like JDK 9+ `StackWalker.walk(…)` works. – Holger Jan 07 '19 at 08:12
  • 1
    @john16384 or you replace the initial `UncheckedCloseable close=null;` with `UncheckedCloseable close=() -> {};`; there are plenty of possibilities. This code is more a sketch than the ultimate production-ready code. AFAIK, similar solutions found their way into libraries, so using these libraries is the preferred way then. – Holger Apr 18 '19 at 11:33
  • @Holger Thanks, still experimenting with this; I don't like the reassignment of `close` every time – john16384 Apr 18 '19 at 11:35
  • For who could have the same problem on the future: using MySQL 5.7 the fetch size was not working for me until set with `Integer.MIN_VALUE` – Dherik Sep 03 '20 at 13:02
  • You could put a stub try-with-resources in the onClose block and dispense with `UncheckedCloseable`. For example, `.onClose(() -> { try (Connection c = connection; PreparedStatement ps = pSt; ResultSet rs = resultSet) { /*do nothing*/ } catch (SQLException e) { throw new RuntimeException(e); } });` – Clement Cherlin Aug 28 '23 at 16:40
  • 1
    @ClementCherlin that’s possible. Since Java 9 you can even write `.onClose(() -> { try(connection; pSt; resultSet) { /*do nothing*/ } catch (SQLException e) { throw new RuntimeException(e); } });` However, the `UncheckedCloseable` is used for two things. If the Stream construction fails, it is executed immediately. Tracking which resources have been opened for this purpose would also be possible without `UncheckedCloseable`, but would complicate the code structure. The way it is written now, it’s also easy to add more resources without having to adapt multiple code locations. – Holger Aug 29 '23 at 07:29
17

jOOQ

I'm going to answer the jOOQ part of your question. As of jOOQ 3.8, there have now been quite a few additional features related to combining jOOQ with Stream. Other usages are also documented on this jOOQ page.

Your suggested usage:

You tried this:

Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

Indeed, this doesn't work well for large result sets because fetch(ResultSet) fetches the entire result set into memory and then calls Collection.stream() on it.

Better (lazy) usage:

Instead, you could write this:

try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
    ...
}

... which is essentially convenience for this:

try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
    Stream<Record> stream = cursor.stream();
    ...
}

See also DSLContext.fetchStream(ResultSet)

Of course, you could also let jOOQ execute your SQL string, rather than wrestling with JDBC:

try (Stream<Record> stream = 
     DSL.using(dataSource)
        .resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
        .fetchSize(5000)
        .fetchStream()) {
    ...
}

The dreaded SELECT *

As was criticised in the comments, their jOOQ usage seemed slow because of how jOOQ eagerly fetches LOB data into memory despite using fetchLazy(). The word "lazy" corresponds to fetching records lazily (one by one), not fetching column data lazily. A record is completely fetched in one go, assuming you actually want to project the entire row.

If you don't need some heavy rows, don't project them! SELECT * is almost always a bad idea in SQL. Drawbacks:

  • It causes a lot more I/O and memory overhead in the database server, the network, and the client.
  • It prevents covering index usage
  • It prevents join elimination transformations

More info in this blog post here.

On try-with-resources usage

Do note that a Stream produced by jOOQ is "resourceful", i.e. it contains a reference to an open ResultSet (and PreparedStatement). So, if you really want to return that stream outside of your method, make sure it is closed properly!

Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
  • How does `.fetchStream()` differ from `.fetchMaps()`? Besides the return types being `Stream` and `List>` respectively, is it safe to say that `.fetchStream()` does not persist any information that *was* streamed? I am seeing if I can return records in a JSON payload with a Stream rather than eager fetching, but I am only able to stream results once and the variable I am using to define the stream does not produce anything when referenced by another class. – Torc Jan 28 '20 at 17:47
  • 2
    @JoeH: Very few methods in jOOQ are really "lazy". `fetchStream()` and `fetchLazy()` are such lazy methods, which keep the underlying JDBC `ResultSet` open until the resulting resource is closed. `fetchMaps()` will eagerly fetch the entire JDBC `ResultSet` into the target data structure and close it immediately, so no resources are generated in your client code which need explicit closing. I'll also answer your separate question here: https://stackoverflow.com/questions/59956246/stream-fetched-from-postgres-with-jooq-not-returning-results-from-class – Lukas Eder Jan 29 '20 at 10:51
  • @LukasEder please use JOOQ with caution. I used JOOQ for lazy streaming which worked well until I encountered Clob types which is not supported in JOOQ and it converted to String internally which really degraded performance. There may be other issues or bugs in JOOQs – TriCore Oct 18 '21 at 04:57
  • 1
    @TriCore you're funny :) well, it's all documented – Lukas Eder Oct 18 '21 at 05:41
  • @LukasEder documenting a bug doesn't make it less of a bug ;) – TriCore Oct 19 '21 at 03:45
  • @TriCore How is strategically eager fetching everything by default a bug? It's by design. If Clob semantics is important to you (it hardly ever is, it's an edge case), you can attach a Clob binding to relevant columns, or avoid projecting them – Lukas Eder Oct 19 '21 at 06:36
  • 1
    @LukasEder so sorry didn't realize that you are the CEO of the company, didn't mean to be rude. Big fan of JOOQ btw but when I used JOOQ lazy stream with Clob values, the query took about a min which typically takes few seconds with plain JDBC. – TriCore Oct 19 '21 at 21:51
  • @TriCore: No offense taken, just thought you were funny trying to recommend to me to use jOOQ with caution ;-) Well, if you're going to make a fair comparison, the JDBC version will take just as long when you fully consume the `Clob` value instead of ignoring it. In case you do ignore it, why did you project it in the first place? Did you run a `SELECT *` query? But you should never fetch more data than you actually need. Eager-fetching all `Clob` contents is a great convenience feature in jOOQ (or Hibernate, for that matter), because the cases where you *want* to stream are very rare. – Lukas Eder Oct 20 '21 at 13:55
  • @LukasEder it may need to read some Clob values based on some condition, so eagerly fetching Clob doesn't work for me at least in this situation. In my opinion, eagerly fetching Clob values in a lazy stream is a decision made by JOOQ library which it shouldn't and I don't think it's a good strategy. Please don't compare with Hibernate :) JOOQ is far better than Hibernate that's why I chose it over hibernate – TriCore Oct 20 '21 at 17:46
  • @TriCore: Of course it should read those, because you told it to, and that's what 99% of users would expect. No one enjoys writing 30 lines of `Clob` resource management manually *every time* they read a `Clob`. In Oracle, most `Clob` values are simple strings, not 10GB of data. It's such a PITA to read `Clob` through JDBC, so jOOQ adds a ton of value to most users. "Based on some condition" well, jOOQ *is* a [dynamic SQL](https://www.jooq.org/doc/latest/manual/sql-building/dynamic-sql/) library, so do write dynamic SQL, and avoid reading the `Clob` when you're not interested in the `Clob` – Lukas Eder Oct 20 '21 at 17:57
  • @TriCore: But maybe, instead of commenting here, why not ask a new question with more details of what you're actually doing. The result of that may be more useful to you and future readers. – Lukas Eder Oct 20 '21 at 18:02
  • @TriCore: Anyway, I updated my answer. Maybe you can see this as an opportunity where jOOQ exposed a flaw in your queries, as you can see in this blog post: https://blog.jooq.org/many-sql-performance-problems-stem-from-unnecessary-mandatory-work. By not projecting the clob when you don't need the clob, you can further improve performance *drastically*, even in your JDBC version. – Lukas Eder Oct 20 '21 at 18:13
9

I'm not aware of any well-known library that will do it for you.

That said, this article shows how to wrap the resultset with an Iterator (ResultSetIterator) and pass it as the first parameter to Spliterators.spliteratorUnknownSize() in order to create a Spliterator.

The Spliterator can then be used by StreamSupport in order to create a Stream on top of it.

Their suggested implementation of ResultSetIterator class:

public class ResultSetIterator implements Iterator {

    private ResultSet rs;
    private PreparedStatement ps;
    private Connection connection;
    private String sql;

    public ResultSetIterator(Connection connection, String sql) {
        assert connection != null;
        assert sql != null;
        this.connection = connection;
        this.sql = sql;
    }

    public void init() {
        try {
            ps = connection.prepareStatement(sql);
            rs = ps.executeQuery();

        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }
    }

    @Override
    public boolean hasNext() {
        if (ps == null) {
            init();
        }
        try {
            boolean hasMore = rs.next();
            if (!hasMore) {
                close();
            }
            return hasMore;
        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }

    }

    private void close() {
        try {
            rs.close();
            try {
                ps.close();
            } catch (SQLException e) {
                //nothing we can do here
            }
        } catch (SQLException e) {
            //nothing we can do here
        }
    }

    @Override
    public Tuple next() {
        try {
            return SQL.rowAsTuple(sql, rs);
        } catch (DataAccessException e) {
            close();
            throw e;
        }
    }
}

and then:

public static Stream stream(final Connection connection, 
                                       final String sql, 
                                       final Object... parms) {
  return StreamSupport
                .stream(Spliterators.spliteratorUnknownSize(
                        new ResultSetIterator(connection, sql), 0), false);
}
Nir Alfasi
  • 53,191
  • 11
  • 86
  • 129
  • 3
    Note that for short-circuit stream operation the iterator can be abandoned in any moment leaving the non-closed `ResultSet`. It's better to create a closeable Stream and require to close it explicitly after the operation. Also why raw-types in Java-8? – Tagir Valeev Aug 26 '15 at 01:57
  • @TagirValeev how would you create a closeable stream and require to close it ? – Nir Alfasi Aug 26 '15 at 02:15
  • 4
    Every stream can be closed (as `AutoCloseable`), but by default this does nothing. You can add a close handler like `StreamSupport.stream(...).onClose(myIterator::close)` (store the `ResultSetIterator` into `myIterator` variable). You can require to close it writing the proper JavaDoc like it's done for [`Files.lines`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#lines-java.nio.file.Path-java.nio.charset.Charset-) method (If timely disposal of file system resources is required, the try-with-resources construct should be used blah blah). – Tagir Valeev Aug 26 '15 at 04:31
  • @TagirValeev thanks!!! I was afraid that this day will pass without me learning anything, but now you came... :))) thanks again! – Nir Alfasi Aug 26 '15 at 04:35
  • 5
    First of all, you should not use raw types. Second, the `Iterator` is broken as `hasNext()` has an unexpected side-effect as it will advance to the next line. [This is not a theoretical issue.](http://stackoverflow.com/q/29037100/2711488). Note that you can fix it *and* half the code size by implementing a `Spliterator`. Finally that unused varargs parameter `parms` is asking for trouble. – Holger Aug 26 '15 at 15:17
  • @Holger `params` implies that the query can include parameters (PreparedStatement), this part was not implemented because this is an example that meant for a different purpose but it could easily be expanded. Same goes for the `hasNext()`, this implementation has the contract of calling `hasNext()` before calling `next()` (which is the way it's usually done) - it could be changed, but again - that's not the point (and the same applies for raw-types). – Nir Alfasi Aug 26 '15 at 15:28
  • 2
    There is no contract that `hasNext()` and `next()` are paired and I already linked to a question showing the `Stream`s —and you are creating a stream out of the iterator— **do** call `hasNext` more than once occasionally. You can’t make up your own contract and declare that the stream API has to adhere to it. As *proven*, that doesn’t work. – Holger Aug 26 '15 at 15:37
  • @Holger this issue can be fixed, but it will make the code more complex (keeping the rs in a class member + a counter to the number of calls to `next()`) but it will create an extra boilerplate that beats the purpose of the example. Point taken though, thanks. – Nir Alfasi Aug 26 '15 at 15:42
  • 2
    As already said, you can fix it by making the code *simpler* by implementing a `Spliterator` instead of an `Iterator`. – Holger Aug 26 '15 at 15:47
  • This is the most usable answer. A variation of this code worked great for me. It deserves more votes. – Holger Ludvigsen Oct 12 '21 at 08:17
4

Here is the simplest sample by abacus-jdbc.

final DataSource ds = JdbcUtil.createDataSource(url, user, password);
final SQLExecutor sqlExecutor = new SQLExecutor(ds);
sqlExecutor.stream(sql, parameters).filter(...).map(...).collect(...) // lazy execution&loading and auto-close Statement/Connection

Or:

JdbcUtil.prepareQuery(ds, sql)
            .stream(ResultRecord.class) // or RowMapper.MAP/...
            .filter(...).map(...).collect(...)  // lazy execution&loading and auto-close Statement/Connection

This is totally lazy loading and auto-closure. The records will loaded from db by fetch size (default if not specified) and the Statement and Connection will automatically closed after the result/records are collected.

Disclosure: I'm the developer of AbacusUtil.

user_3380739
  • 1
  • 14
  • 14
  • After a quick peek at AbacusUtil it looks to me that this is a gigantic library which I would be very reluctant to include in a solution. You may want to split it up in smaller modules where I could pick only what I actually needed? – Thorbjørn Ravn Andersen Dec 25 '18 at 19:21
  • 1
    Splitted into four projects: abacus-util, abacus-jdbc, abacus-da, abacus-android since 1.8.2. – msangel Nov 12 '19 at 17:10
0

Using my library it would be done like this:

attach maven dependency:

<dependency>
    <groupId>com.github.buckelieg</groupId>
    <artifactId>db-fn</artifactId>
    <version>0.3.4</version>
</dependency>

use library in code:

Function<Stream<I>, O> processor = stream -> //process input stream
try (DB db = new DB("jdbc:postgresql://host:port/database?user=user&password=pass")) {
    processor.apply(
        db.select("SELECT * FROM my_table t1 JOIN my_table t2 ON t1.id = t2.id")
          .fetchSize(5000)
          .execute(rs -> /*ResultSet mapper*/)
    );
}

See more here

Anatoly
  • 79
  • 1
  • 5
0

Some common module called Tools of a Ujorm framework offers a simple solution using the RowIterator class. Example of use:

    PreparedStatement ps = dbConnection.prepareStatement("SELECT * FROM myTable");
    new RowIterator(ps).toStream().forEach((RsConsumer)(resultSet) -> {
        int value = resultSet.getInt(1);
    });

Maven dependency on the Tools library (50KB):

    <dependency>
        <groupId>org.ujorm</groupId>
        <artifactId>ujo-tools</artifactId>
        <version>1.93</version>
    </dependency>

See jUnit test for more information.

pop
  • 35
  • 4
-2

I just did the summary to provide the real example about how to stream ResultSet and do the simple SQL query without using 3rd click here for detail

Blockquote: Java 8 provided the Stream family and easy operation of it. The way of pipeline usage made the code clear and smart. However, ResultSet is still go with very legacy way to process. Per actual ResultSet usage, it is really helpful if converted as Stream.

.... StreamUtils.uncheckedConsumer is required to convert the the SQLException to runtimeException to make the Lamda clear.

user2276550
  • 283
  • 2
  • 3