2

I have 5,000,000 insert query in file. I want read them from file and write to cassandra with java driver and executeAsync method, in loop statement like following code:

public static void main(String[] args) {
        FileReader fr = null;
        try {
            fr = new FileReader("the-file-name.txt");
            BufferedReader br = new BufferedReader(fr);
            String sCurrentLine;
            long time1 = System.currentTimeMillis();
            while ((sCurrentLine = br.readLine()) != null) {
                session.executeAsync(sCurrentLine);
            }

            System.out.println(System.currentTimeMillis() - time1);
            fr.close();
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    } 

my table definition is:

CREATE TABLE test.climate (
    city text,
    date text,
    time text,
    temprature int,
    PRIMARY KEY ((city, date), time)
) WITH CLUSTERING ORDER BY (time ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

But after running program the count of row in table is 2,569,725

cqlsh:test> select count(*) from climate ;

 count
---------
 2569725

I tested more than 10 times and each time the result of select count(*) was between 2,400,00 and 2,600,000

Vickie Jack
  • 95
  • 1
  • 10
  • Did you forget to close the session and cluster? – Ordous Apr 06 '18 at 13:33
  • Show us a sample insert statement? Many times people think they insert 5M rows but partition key remains same on multiple rows and only clustering column changes. For example here you might have (city,date) same for different time values which is valid but counted as single row. Also in cqlsh default consistency is ONE, change it to local quorum – dilsingi Apr 06 '18 at 14:09
  • Add logging for your code to know what is happen: `Futures.addCallback( session.executeAsync( sCurrentLine ), new FutureCallback() { @Override public void onSuccess( ResultSet result ) { //ignore } @Override public void onFailure( Throwable t ) { t.printStackTrace(); } } );` I guess you will see exceptions about timeouts or node unavailables – Mikhail Baksheev Apr 06 '18 at 14:43
  • exception is: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/127.0.0.1] Pool is busy (no available connection and the queue has reached its max size 256))) @MikhailBaksheev – Vickie Jack Apr 06 '18 at 15:06
  • 1
    look at this topic: https://stackoverflow.com/questions/49685400/handle-correctly-a-big-number-of-asynchronous-queries – Mikhail Baksheev Apr 06 '18 at 15:11
  • Thanks @MikhailBaksheev. It's work. – Vickie Jack Apr 06 '18 at 15:24

1 Answers1

1

You are issuing async inserts faster than they execute so they eventually exceed the queue size and fail. You can increase your queue size which would work, but then your just applying back pressure to memory instead of your producer and still possibly hitting a wall. Try limiting the in flight queries like:

public static void main2(String[] args) {
    FileReader fr = null;
    int permits = 256;
    Semaphore l = new Semaphore(permits);
    try {
        fr = new FileReader("the-file-name.txt");
        BufferedReader br = new BufferedReader(fr);
        String sCurrentLine;
        long time1 = System.currentTimeMillis();
        while ((sCurrentLine = br.readLine()) != null) {
            l.acquire();
            session.executeAsync(sCurrentLine)
                .addListener(()->l.release(), MoreExecutors.directExecutor());
        }
        l.acquire(permits);

        System.out.println(System.currentTimeMillis() - time1);
        fr.close();
        br.close();
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

It will likely run just as fast, just need to find the right size of the semaphore. Also note the blocking until all the permits have been returned (acquiring max at end), otherwise you can shutdown jvm before the requests were all sent that are possibly in queue.

disclaimer: i did not test the above code

Chris Lohfink
  • 16,150
  • 1
  • 29
  • 38