-1

I tried implementing async query using java-driver-async-queries. I am modifying a List within the FutureCallback but seems its not working -

List<Product> products = new ArrayList<Product>();

for (// iterating over a Map) {
    key = entry.getKey();
    String query = "SELECT id,desc,category FROM products where id=?";
    ResultSetFuture future = session.executeAsync(query, key);
    Futures.addCallback(future,
        new FutureCallback<ResultSet>() {
            @Override public void onSuccess(ResultSet result) {
                Row row = result.one();
                if (row != null) {
                    Product product = new Product();
                    product.setId(row.getString("id"));
                    product.setDesc(row.getString("desc"));
                    product.setCategory(row.getString("category"));

                    products.add(product);
                }
            }

            @Override public void onFailure(Throwable t) {
                // log error
            }
        },
        MoreExecutors.sameThreadExecutor()
    );
}

System.out.println("Product List : " + products); // Not printing correct values. Sometimes print blank

Is there any other way?

Based on Mikhail Baksheev answer I implemented and now getting proper result. Just a twist. There is some extra logic i need to implement. I am wondering if I can use List<MyClass> instead of List<ResultSetFuture> and MyClass as -

public class MyClass {

    private Integer         productCount;
    private Integer         stockCount;
    private ResultSetFuture result;
}

Then while iterating set FutureList as -

ResultSetFuture result = session.executeAsync(query, key.get());
MyClass allResult = new MyClass();
allResult.setInCount(inCount);
allResult.setResult(result);
allResult.setSohCount(values.size() - inCount);

futuresList.add(allResult);
Saurabh
  • 2,384
  • 6
  • 37
  • 52
  • What's the definition of "not working"? Post expected behavior, and actual behavior. – Abhijit Sarkar Aug 29 '17 at 22:27
  • 1
    You aren't waiting for your futures are you? it looks like you create a bunch of futures and the immediately print the result structure. The Result structure hasn't been populated because most of the futures are still in flight. – RussS Aug 29 '17 at 22:33
  • Thanks. What is the correction. Is there any code example which I can refer? – Saurabh Aug 30 '17 at 00:07

1 Answers1

1

As @RussS mentioned, the code is not waiting all futures are completed.

There are many ways to synchronize async code. For example, using CountDownLatch:

EDIT: Also please use separte thread for callbacks and use concurrent collection for products.

ConcurrentLinkedQueue<Product> products = new ConcurrentLinkedQueue<Product>();
final Executor callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch doneSignal = new CountDownLatch(/*the Map size*/);
for (// iterating over a Map) {
    key = entry.getKey();
    String query = "SELECT id,desc,category FROM products where id=?";
    ResultSetFuture future = session.executeAsync(query, key);
    Futures.addCallback(future,
        new FutureCallback<ResultSet>() {
            @Override public void onSuccess(ResultSet result) {
                Row row = result.one();
                if (row != null) {
                    Product product = new Product();
                    product.setId(row.getString("id"));
                    product.setDesc(row.getString("desc"));
                    product.setCategory(row.getString("category"));

                    products.add(product);
                }
                doneSignal.countDown();

            }

            @Override public void onFailure(Throwable t) {
                // log error
                doneSignal.countDown();
            }
        },
        callbackExecutor
    );
}

doneSignal.await();           // wait for all async requests to finish
System.out.println("Product List : " + products); 

Another way is to collect all futures in a list and wait all results as a single future with guava's Futures.allAsList, e.g:

List<ResultSetFuture> futuresList = new ArrayList<>( /*Map size*/);
        for (/* iterating over a Map*/) {
            key = entry.getKey();
            String query = "SELECT id,desc,category FROM products where id=?";
            futuresList.add( session.executeAsync( query, key ) );
        }

        ListenableFuture<List<ResultSet>> allFuturesResult = Futures.allAsList( futuresList );
        List<Product> products = new ArrayList<>();
        try {
            final List<ResultSet> resultSets = allFuturesResult.get();
            for ( ResultSet rs : resultSets ) {
                if ( null != rs ) {
                    Row row = rs.one();
                    if (row != null) {
                        Product product = new Product();
                        product.setId(row.getString("id"));
                        product.setDesc(row.getString("desc"));
                        product.setCategory(row.getString("category"));

                        products.add(product);
                    }
                }
            }
        } catch ( InterruptedException | ExecutionException e ) {
            System.out.println(e);
        }
        System.out.println("Product List : " + products);

EDIT 2

I am wondering if I can use List instead of List and MyClass as

Technically yes, but you can't pass List<MyClass> in Futures.allAsList in this case or MyClass should implement ListenableFuture interface

Mikhail Baksheev
  • 1,394
  • 11
  • 13
  • Thanks. I had a problem doing nested query in cassandra and followed your reply https://stackoverflow.com/questions/45471519/improve-performance-in-cassandra-and-java-collections. Now getting intermittent issue as sometime my products list getting as empty. But when I print list inside onSuccess method it gives me result. If i use await() then method is taking long time. Please suggest. – Saurabh Aug 31 '17 at 05:55
  • @Saurabh, I've updeted my answer – Mikhail Baksheev Aug 31 '17 at 09:21
  • Accepted. And I got workaround for other query I have asked. Just a last point if you can help me with. Sometime (very rare) I get com.datastax.driver.core.exceptions.BusyPoolException. So just thinking if we are safe to use this solution in multi request scenario. Any best practice that we need to have? – Saurabh Aug 31 '17 at 19:44
  • BusyPoolExcption means you have too many of simultaneous requests and driver does not have resources to process all requests. You need to tune driver options: http://docs.datastax.com/en/developer/java-driver/3.3/manual/pooling/ Or limit number of async queries https://stackoverflow.com/questions/30509095/cassandra-is-there-a-way-to-limit-number-of-async-queries if the queries will overload cassandra cluster. – Mikhail Baksheev Sep 01 '17 at 10:41