2

I have built an importer for MongoDB and Cassandra. Basically all operations of the importer are the same, except for the last part where data gets formed to match the needed cassandra table schema and wanted mongodb document structure. The write performance of Cassandra is really bad compared to MongoDB and I think I'm doing something wrong.

Basically, my abstract importer class loads the data, reads out all data and passes it to the extending MongoDBImporter or CassandraImporter class to send data to the databases. One database is targeted at a time - no "dual" inserts to both C* and MongoDB at the same time. The importer is run on the same machine against the same number of nodes (6).


The Problem:

MongoDB import finished after 57 minutes. I ingested 10.000.000 documents and I expect about the same amount of rows for Cassandra. My Cassandra importer is now running since 2,5 hours and is only at 5.000.000 inserted rows. I will wait for the importer to finish and edit the actual finish time in here.


How I import with Cassandra:

I prepare two statements once before ingesting data. Both statements are UPDATE queries because sometimes I have to append data to an existing list. My table is cleared completely before starting the import. The prepared statements get used over and over again.

PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);

For every row, I create a BoundStatement and pass that statement to my "custom" batching method:

    BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
    bs = bs.bind();

    //add data... with several bs.setXXX(..) calls

    cassandraConnection.executeBatch(bs);

With MongoDB, I can insert 1000 Documents (thats the maximum) at a time without problems. For Cassandra, the importer crashes with com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large for just 10 of my statements at some point. I'm using this code to build the batches. Btw, I began with 1000, 500, 300, 200, 100, 50, 20 batch size before but obviously they do not work too. I then set it down to 10 and it threw the exception again. Now I'm out of ideas why it's breaking.

private static final int MAX_BATCH_SIZE = 10;

private Session session;
private BatchStatement currentBatch;

...

@Override
public ResultSet executeBatch(Statement statement) {
    if (session == null) {
        throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
    }

    if (currentBatch == null) {
        currentBatch = new BatchStatement(Type.UNLOGGED);
    }

    currentBatch.add(statement);
    if (currentBatch.size() == MAX_BATCH_SIZE) {
        ResultSet result = session.execute(currentBatch);
        currentBatch = new BatchStatement(Type.UNLOGGED);
        return result;
    }

    return null;
}

My C* schema looks like this

CREATE TYPE stream.event (
    data_dbl frozen<map<text, double>>,
    data_str frozen<map<text, text>>,
    data_bool frozen<map<text, boolean>>,
);

CREATE TABLE stream.data (
    log_creator text,
    date text, //date of the timestamp
    ts timestamp,
    log_id text, //some id
    hour int, //just the hour of the timestmap
    x double,
    y double,
    events list<frozen<event>>,
    PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)

I sometimes need to add further new events to an existing row. That's why I need a List of UDTs. My UDT contains three maps because the event creators produce different data (key/value pairs of type string/double/boolean). I am aware of the fact that the UDTs are frozen and I can not touch the maps of already ingested events. That's fine for me, I just need to add new events that have the same timestamp sometimes. I partition on the creator of the logs (some sensor name) as well as the date of the record (ie. "22-09-2016") and the hour of the timestamp (to distribute data more while keeping related data close together in a partition).


I'm using Cassandra 3.0.8 with the Datastax Java Driver, version 3.1.0 in my pom. According to What is the batch limit in Cassandra?, I should not increase the batch size by adjusting batch_size_fail_threshold_in_kb in my cassandra.yaml. So... what do or what's wrong with my import?


UPDATE So I have adjusted my code to run async queries and store the currently running inserts in a list. Whenever an async insert finishes, it will be removed from the list. When the list size exceeds a threshold and an error occured in an insert before, the method will wait 500ms until the inserts are below the threshold. My code is now automatically increasing the threshold when no insert failed.

But after streaming 3.300.000 rows, there were 280.000 inserts being processed but no error happened. This seems number of currently processed inserts looks too high. The 6 cassandra nodes are running on commodity hardware, which is 2 years old.

Is this the high number (280.000 for 6 nodes) of concurrent inserts a problem? Should I add a variable like MAX_CONCURRENT_INSERT_LIMIT?

private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...

@Override
public void executeBatch(Statement statement) throws InterruptedException {
    if (this.runningInsertList == null) {
        this.runningInsertList = new ArrayList<>();
    }

    //Sleep while the currently processing number of inserts is too high
    while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        Thread.sleep(concurrentInsertSleepTime);
    }

    ResultSetFuture future = this.executeAsync(statement);
    this.runningInsertList.add(future);

    Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
            runningInsertList.remove(future);
        }

        @Override
        public void onFailure(Throwable t) {
            concurrentInsertErrorOccured = true;
        }
    }, MoreExecutors.sameThreadExecutor());

    if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        concurrentInsertLimit += 2000;
        LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
    }

    return;
}
Community
  • 1
  • 1
j9dy
  • 2,029
  • 3
  • 25
  • 39

2 Answers2

8

After using C* for a bit, I'm convinced you should really use batches only for keeping multiple tables in sync. If you don't need that feature, then don't use batches at all because you will incur in performance penalties.

The correct way to load data into C* is with async writes, with optional backpressure if your cluster can't keep up with the ingestion rate. You should replace your "custom" batching method with something that:

  • performs async writes
  • keep under control how many inflight writes you have
  • perform some retry when a write timeouts.

To perform async writes, use the .executeAsync method, that will return you a ResultSetFuture object.

To keep under control how many inflight queries just collect the ResultSetFuture object retrieved from the .executeAsync method in a list, and if the list gets (ballpark values here) say 1k elements then wait for all of them to finish before issuing more writes. Or you can wait for the first to finish before issuing one more write, just to keep the list full.

And finally, you can check for write failures when you're waiting on an operation to complete. In that case, you could:

  1. write again with the same timeout value
  2. write again with an increased timeout value
  3. wait some amount of time, and then write again with the same timeout value
  4. wait some amount of time, and then write again with an increased timeout value

From 1 to 4 you have an increased backpressure strength. Pick the one that best fit your case.


EDIT after question update

Your insert logic seems a bit broken to me:

  1. I don't see any retry logic
  2. You don't remove the item in the list if it fails
  3. Your while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) is wrong, because you will sleep only when the number of issued queries is > concurrentInsertLimit, and because of 2. your thread will just park there.
  4. You never set to false concurrentInsertErrorOccured

I usually keep a list of (failed) queries for the purpose of retrying them at later time. That gives me powerful control on the queries, and when the failed queries starts to accumulate I sleep for a few moments, and then keep on retrying them (up to X times, then hard fail...).

This list should be very dynamic, eg you add items there when queries fail, and remove items when you perform a retry. Now you can understand the limits of your cluster, and tune your concurrentInsertLimit based on eg the avg number of failed queries in the last second, or stick with the simpler approach "pause if we have an item in the retry list" etc...


EDIT 2 after comments

Since you don't want any retry logic, I would change your code this way:

private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...

@Override
public void executeBatch(Statement statement) throws InterruptedException {
    if (this.runningInsertList == null) {
        this.runningInsertList = new ArrayList<>();
    }

    ResultSetFuture future = this.executeAsync(statement);
    this.runningInsertList.add(future);

    Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
            runningInsertList.remove(future);
        }

        @Override
        public void onFailure(Throwable t) {
            runningInsertList.remove(future);
            concurrentInsertErrorOccured = true;
        }
    }, MoreExecutors.sameThreadExecutor());

    //Sleep while the currently processing number of inserts is too high
    while (runningInsertList.size() >= concurrentInsertLimit) {
        Thread.sleep(concurrentInsertSleepTime);
    }

    if (!concurrentInsertErrorOccured) {
        // Increase your ingestion rate if no query failed so far
        concurrentInsertLimit += 10;
    } else {
        // Decrease your ingestion rate because at least one query failed
        concurrentInsertErrorOccured = false;
        concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
        while (runningInsertList.size() >= concurrentInsertLimit) {
            Thread.sleep(concurrentInsertSleepTime);
        }
    }

    return;
}

You could also optimize a bit the procedure by replacing your List<ResultSetFuture> with a counter.

Hope that helps.

xmas79
  • 5,060
  • 2
  • 14
  • 35
  • That's a nice idea, will definitely implement that feature. Is there anything you would recommend me to do when I know that a lot of rows will be inserted to the same partition (thus the same node)? Im building up for read performance so keeping related rows together might be key for me. Should I start splitting up my data and continue with a different part of the data when one certain part of my data will cause too much backpressure at that time. something like (pseudocode): "if resultSetFutureList.size > 1000 then skip next 50000 rows and continue there"? – j9dy Sep 22 '16 at 17:43
  • 1
    The Java driver is *token aware*, that means it will connect to the *right* coordinator (that is the node responsible for *that* partition) to perform a write (unless you use batches, because in this case the coordinator is chosen based on the token resulting from the first statement of the batch). So you don't have to worry about that thing. Just keep pushing your data. If you never have a write timeout with `if resultSetFutureList.size > 1000` then raise the limit to 2k. Do this until you find **your** magic number here.In my case it was... automatically tuned by the system itself...32k IIRC – xmas79 Sep 22 '16 at 20:24
  • so basically I set up a `List` variable and add each `ResultSetFuture` to that list in my insert method. I'm using Guavas `Futures.addCallback` and the `onSuccess` method removes the corresponding ResultSetFuture from the list. `onFailure` will set a boolean to true that says that there was an insertion error going on. In my insert method, there is a while loop `while (errorOccured && list.size() > insertLimit)` and the thread will sleep for 500 ms in it. After streaming 3.300.000 rows, there were 280.000 inserts being processed but no error happened. Should I worry? – j9dy Sep 23 '16 at 09:10
  • Hard to say... 280k is a bit high number... Do you have a NASA cluster? You should update your question with your updated code, then we'll check if something is wrong or not... – xmas79 Sep 23 '16 at 09:15
  • your updated answer is correct. I was building up the logic of the async inserts one-by-one, meaning there was no retry logic currently and this was intended. One thing that I don't understand is: Why does no query fail for me? In my code I added a call to the logger when the insert fails with an exception. This never happened, even at 300.000 concurrent inserts. You said the best value for your use case was 32k concurrent inserts. Did you already see inserts being dropped by that mark? – j9dy Sep 23 '16 at 10:26
  • What about using semaphores limit the current running executors semaphore = new Semaphore(500); call semaphore.acuire() before executeAsync and semaphore.release() onSuccess and onFailure – M.Almokadem Sep 17 '19 at 21:51
6

When you run a batch in Cassandra, it chooses a single node to act as the coordinator. This node then becomes responsible for seeing to it that the batched writes find their appropriate nodes. So (for example) by batching 10000 writes together, you have now tasked one node with the job of coordinating 10000 writes, most of which will be for different nodes. It's very easy to tip over a node, or kill latency for an entire cluster by doing this. Hence, the reason for the limit on batch sizes.

The problem is that Cassandra CQL BATCH is a misnomer, and it doesn't do what you or anyone else thinks that it does. It is not to be used for performance gains. Parallel, asynchronous writes will always be faster than running the same number of statements BATCHed together.

I know that I could easily batch 10.000 rows together because they will go to the same partition. ... Would you still use single row inserts (async) rather than batches?

That depends on whether or not write performance is your true goal. If so, then I'd still stick with parallel, async writes.

For some more good info on this, check out these two blog posts by DataStax's Ryan Svihla:

Cassandra: Batch loading without the Batch keyword

Cassandra: Batch Loading Without the Batch — The Nuanced Edition

Aaron
  • 55,518
  • 11
  • 116
  • 132
  • I know that I could easily batch 10.000 rows together because they will go to the same partition. I'm writing my data once and will only read them out afterwards. Updates to existing rows will be done once in a few weeks, but they might happen. I'm not worrying about insertion hot-spots because read performance is key for me and data will only be written once - and also only written when nobody is querying on the existing data. Would you still use single row inserts (async) rather than batches? – j9dy Sep 22 '16 at 15:23