2

I set up a Cassandra cluster on AWS. What I want to get is increased I/O throughput (number of reads/writes per second) as more nodes are added (as advertised). However, I got exactly the opposite. The performance is reduced as new nodes are added.

Do you know any typical issues that prevents it from scaling?

Here is some details:

I am adding a text file (15MB) to the column family. Each line is a record. There are 150000 records. When there is 1 node, it takes about 90 seconds to write. But when there are 2 nodes, it takes 120 seconds. I can see the data is spread to 2 nodes. However, there is no increase in throughput.

The source code is below:

public class WordGenCAS {
static final String KEYSPACE = "text_ks";
static final String COLUMN_FAMILY = "text_table";
static final String COLUMN_NAME = "text_col";

public static void main(String[] args) throws Exception {
    if (args.length < 2) {
        System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
        System.exit(-1);
    }

    String[] contactPts = args[1].split(",");

    Cluster cluster = Cluster.builder()
            .addContactPoints(contactPts)
            .build();
    Session session = cluster.connect(KEYSPACE);

    InputStream fis = new FileInputStream(args[0]);
    InputStreamReader in = new InputStreamReader(fis, "UTF-8");
    BufferedReader br = new BufferedReader(in);

    String line;
    int lineCount = 0;
    while ( (line = br.readLine()) != null) {
        line = line.replaceAll("'", " ");
        line = line.trim();
        if (line.isEmpty())
            continue;
        System.out.println("[" + line + "]");
        String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
                COLUMN_FAMILY,
                COLUMN_NAME,
                lineCount,
                line);
        session.execute(cqlStatement2);
        lineCount++;
    }

    System.out.println("Total lines written: " + lineCount);
}

}

The DB schema is the following:

CREATE KEYSPACE text_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };

USE text_ks;

CREATE TABLE text_table (
    id int,
    text_col text,
    primary key (id)
) WITH COMPACT STORAGE;

Thanks!

Community
  • 1
  • 1
Danke Xie
  • 1,757
  • 17
  • 13
  • 1
    How exactly are you loading this file? 2000 inserts a second is very slow(even for m1.larges) unless your rows are huge. Are you sure when inserting you are using a driver which contacts all nodes? – RussS Jun 01 '14 at 16:33
  • 1
    Which consistency level are you using? – peter Jun 01 '14 at 19:21
  • @RussS This is useful. I was using DataStax driver and CQL queries, but did not add all contact points. After adding all contact points (which is 2), the speed is the same as 1 node. This is better than before. Is there a way to further improve the performance? – Danke Xie Jun 02 '14 at 00:46
  • @zahorak I am using the default consistency level ('ONE'). – Danke Xie Jun 02 '14 at 00:47
  • 1
    My next guess is that you are using serial "session.execute" commands rather than "session.execute_async". Switching to async will give you much greater throughput both on single and multiple nodes. – RussS Jun 02 '14 at 01:13
  • @RussS It's weird that when I used session.executeAsync for 'insert', the throughput was reduced by about 30%. The Cassandra version is 2.0.7. I got the warning `[main] WARN com.datastax.driver.core.ControlConnection - Found host with 0.0.0.0 as rpc_address, using listen_address (/172.31.16.247) to contact it instead. If this is incorrect you should avoid the use of 0.0.0.0 server side.` two times. Is it critical? Thanks. – Danke Xie Jun 02 '14 at 03:28
  • I set the number of replicas to 1. Is that a problem? – Danke Xie Jun 02 '14 at 03:37
  • That message is not critical and without seeing your code I can't tell for sure what is wrong. But I can't imagine a case where execute_async is slower than execute because execute just calls execute_async with a hold for response. https://github.com/datastax/java-driver/blob/2.0/driver-core/src/main/java/com/datastax/driver/core/AbstractSession.java#L51 – RussS Jun 02 '14 at 03:41
  • @RussS Thanks. I have pasted the source code. BTW, when I changed replication_factor of the key space to 2, it did not increase throughput (possibly expected). The cassandra cluster is using the murmur3 partitioner. – Danke Xie Jun 02 '14 at 03:48
  • Your main problem is that with the serial execution above the biggest bottleneck is your client. While your code is waiting for a response from the database it should be sending more requests. Like most databases C* is setup to handle many concurrent requests. Like I said before you need to modify your code to take advantage of execute_async or have multiple insertion threads (async HIGHLY recommened) Your replication factor is almost completely unrelated in this particular case due to the client being the main bottleneck. But for greatest write speed RF=1 will be faster. – RussS Jun 02 '14 at 03:52
  • @RussS I am pretty sure that if I simply change `session.execute(cqlStatement2);` to `session.executeAsync(cqlStatement2);`, the performance gets worse. I don't understand why, maybe async causes a lot of unprocessed ResultSetFutures or increases contention. I will try more and see if got any luck. – Danke Xie Jun 02 '14 at 06:01
  • How high is the CPU and IO utilization on the Cassandra nodes? – peter Jun 02 '14 at 08:22
  • @zahorak CPU load is close to 0.1 - 0.2. I will check IO. – Danke Xie Jun 02 '14 at 12:36
  • @zahorak Net send/receive rate is about 200KB/s for each node. – Danke Xie Jun 02 '14 at 12:59
  • Than I’d assume like RussS that the bottlenack is the client. You’ll need to check what takes so long and why, and why an async call takes longer (as you mentioned it) than a sync call. Also you could check the servers performance while using async. – peter Jun 02 '14 at 13:16
  • What kind of machine are you running this on? – RussS Jun 02 '14 at 14:21
  • I've used m3.large and m3.medium. They give similar results. The machines are not saturated. – Danke Xie Jun 02 '14 at 14:37
  • @zahorak Sounds good. I will check the client-side performance. First thing to do is to use PreparedStatement and then batched queries. I heard that in JDBC driver, PreparedStatements are 20x faster than query strings after the first execution. That might be the performance gap I am looking for. Maybe that's the case for Datastax drivers too. – Danke Xie Jun 02 '14 at 18:01
  • @RussS I've used m3.large and m3.medium. They give similar results. Could it be a client-side issue? Do you know the relative performance of query strings and preparedStatements? – Danke Xie Jun 02 '14 at 18:02
  • You need to work on asynchronous execution, everything else is just noise. – RussS Jun 02 '14 at 18:29
  • @RussS @zahorak Now I figured out the issue. Actually, the program is missing `System.exit(0);` at the end. So it took a long time to terminate after the I/O has been done. In Async case, it took even longer. That's the case. Another issue is the client side incurred GC a few times which causes a few seconds lag. Without JVM tuning, the performance is good enough. Thanks. – Danke Xie Jun 04 '14 at 06:59

1 Answers1

4

Even if this an old post, I think it's worth posting a solution for these (common) kind of problems.

As you've already discovered, loading data with a serial procedure is slow. What you've been suggested is the right thing to do.

However, issuing a lot of queries without applying some sort of back pressure is likely looking for troubles, and you'll gonna lose data due to excessive overload on the server (and on the driver to some extent).

This solution will load data with async calls, and will try to apply some back pressure on the client to avoid data loss.

public class WordGenCAS {
    static final String KEYSPACE = "text_ks";
    static final String COLUMN_FAMILY = "text_table";
    static final String COLUMN_NAME = "text_col";

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
            System.exit(-1);
        }

        String[] contactPts = args[1].split(",");

        Cluster cluster = Cluster.builder()
                .addContactPoints(contactPts)
                .build();
        Session session = cluster.connect(KEYSPACE);

        InputStream fis = new FileInputStream(args[0]);
        InputStreamReader in = new InputStreamReader(fis, "UTF-8");
        BufferedReader br = new BufferedReader(in);

        String line;
        int lineCount = 0;

        // This is the futures list of our queries
        List<Future<ResultSet>> futures = new ArrayList<>();

        // Loop
        while ( (line = br.readLine()) != null) {
            line = line.replaceAll("'", " ");
            line = line.trim();
            if (line.isEmpty())
                continue;
            System.out.println("[" + line + "]");
            String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
                    COLUMN_FAMILY,
                    COLUMN_NAME,
                    lineCount,
                    line);
            lineCount++;

            // Add the "future" returned by async method the to the list
            futures.add(session.executeAsync(cqlStatement2));

            // Apply some backpressure if we issued more than X query.
            // Change X to another value suitable for your cluster
            while (futures.size() > 1000) {
                Future<ResultSet> future = futures.remove(0);
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        System.out.println("Total lines written: " + lineCount);
        System.out.println("Waiting for writes to complete...");

        // Wait until all writes are done.
        while (futures.size() > 0) {
            Future<ResultSet> future = futures.remove(0);
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        System.out.println("Done!");
    }
}
xmas79
  • 5,060
  • 2
  • 14
  • 35