3

I wantta make async call to cassandra db with execute.Async call in manuel I found this code but I couldn't understand how to collect all rows into any list. Really basic call like Select * from table, and I want to store all the results.

https://docs.datastax.com/en/developer/java-driver/4.4/manual/core/async/

CompletionStage<CqlSession> sessionStage = CqlSession.builder().buildAsync();

// Chain one async operation after another:
CompletionStage<AsyncResultSet> responseStage =
    sessionStage.thenCompose(
        session -> session.executeAsync("SELECT release_version FROM system.local"));

// Apply a synchronous computation:
CompletionStage<String> resultStage =
    responseStage.thenApply(resultSet -> resultSet.one().getString("release_version"));

// Perform an action once a stage is complete:
resultStage.whenComplete(
    (version, error) -> {
      if (error != null) {
        System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
      } else {
        System.out.printf("Server version: %s%n", version);
      }
      sessionStage.thenAccept(CqlSession::closeAsync);
    });
Alex Ott
  • 80,552
  • 8
  • 87
  • 132
jamesawer
  • 31
  • 1
  • 5

2 Answers2

1

You need to refer to the section about asynchronous paging - you need to provide a callback that will collect data into list supplied as external object. Documentation has a following example:

CompletionStage<AsyncResultSet> futureRs =
    session.executeAsync("SELECT * FROM myTable WHERE id = 1");
futureRs.whenComplete(this::processRows);

void processRows(AsyncResultSet rs, Throwable error) {
  if (error != null) {
    // The query failed, process the error
  } else {
    for (Row row : rs.currentPage()) {
      // Process the row...
    }
    if (rs.hasMorePages()) {
      rs.fetchNextPage().whenComplete(this::processRows);
    }
  }
}

in this case processRows can store data in the list that is the part of the current object, something like this:

class Abc {
  List<Row> rows = new ArrayList<>();

  // call to executeAsync

  void processRows(AsyncResultSet rs, Throwable error) {
....
    for (Row row : rs.currentPage()) {
      rows.add(row);
    }
....

  }
}

but you'll need to be very careful with select * from table as it may return a lot of results, plus it may timeout if you have too much data - in this case it's better to perform token range scan (I have an example for driver 3.x, but no for 4.x yet).

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
  • How can we collect result in order of cluster key with paging? is it guarateed to be in order – user1228785 Aug 27 '20 at 04:10
  • Cassandra guarantee that data will be sorted by clustering key, but only inside the partition... – Alex Ott Aug 27 '20 at 06:20
  • I see.. Meanwhile any thoughts on this https://stackoverflow.com/questions/63598069/datastax-object-mapper-converting-to-pojo I am looking for simple Row to POJO mapper so I dont need to type in the code . is it available in datastax driver somewehre? – user1228785 Aug 27 '20 at 13:53
0

Here is a sample for 4.x (you also find sample for reactive code available from 4.4 BTW)

https://github.com/datastax/cassandra-reactive-demo/blob/master/2_async/src/main/java/com/datastax/demo/async/repository/AsyncStockRepository.java

clunven
  • 1,360
  • 6
  • 13