14

First: I know isn't a good idea do a full scan in Cassandra, however, at moment, is that what I need.

When I started look for do someting like this I read people saying wasn't possible do a full scan in Cassandra and he wasn't made to do this type of thing.

Not satisfied, I keep looking until I found this article: http://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/

Look like pretty reasonable and I gave it a try. As I will do this full scan only once and time and performance isn't a issue, I wrote the query and put this in a simple Job to lookup all the records that I want. From 2 billions rows of records, something like 1000 was my expected output, however, I had only 100 records.

My job:

public void run() {
    Cluster cluster = getConnection();
    Session session = cluster.connect("db");

    LOGGER.info("Starting ...");

    boolean run = true;
    int print = 0;

    while ( run ) {
        if (maxTokenReached(actualToken)) {
            LOGGER.info("Max Token Reached!");
            break;
        }
        ResultSet resultSet = session.execute(queryBuilder(actualToken));

        Iterator<Row> rows = resultSet.iterator();
        if ( !rows.hasNext()){
            break;
        }

        List<String> rowIds = new ArrayList<String>();

        while (rows.hasNext()) {
            Row row = rows.next();

            Long leadTime = row.getLong("my_column");
            if (myCondition(myCollumn)) {
                String rowId = row.getString("key");
                rowIds.add(rowId);
            }

            if (!rows.hasNext()) {
                Long token = row.getLong("token(rowid)");
                if (!rowIds.isEmpty()) {
                    LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                }
                actualToken = nextToken(token);
            }

        }

    }
    LOGGER.info("Done!");
    cluster.shutdown();
}

public boolean maxTokenReached(Long actualToken){
    return actualToken >= maxToken;
}

public String queryBuilder(Long nextRange) {
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}

public Long nextToken(Long token){
    return token + 1;
}

Basically what I do is search for the min token allowed and incrementally go until the last.

I don't know, but is like the job had not done the full-scan totally or my query had only accessed only one node or something. I don't know if I'm doing something wrong, or is not really possible do a full scan.

Today I have almost 2 TB of data, only one table in one cluster of seven nodes.

Someone already has been in this situation or have some recommendation?

bcfurtado
  • 181
  • 2
  • 12
  • what is keyspace schema for 'mytable' ? is query running multiple times (because of while loop) and last query might me returning 100 instead of 1000 – turbo Apr 24 '15 at 15:51
  • Schema: http://pastebin.com/DyWAc1wa . And yes, the query is running multiple time and return all the rows setted on LIMIT clause. – bcfurtado Apr 24 '15 at 16:52

5 Answers5

8

It's definitely possible to do a full table scan in Cassandra - indeed, it's quite common for things like Spark. However, it's not typically "fast", so it's discouraged unless you know why you're doing it. For your actual questions:

1) If you're using CQL, you're almost certainly using Murmur3 partitioner, so your minimum token is -9223372036854775808 (and maximum token is 9223372036854775808).

2) You're using session.execute(), which will use a default consistency of ONE, which may not return all of the results in your cluster, especially if you're also writing at ONE, which I suspect you may be. Raise that to ALL, and use prepared statements to speed up the CQL parsing:

 public void run() {
     Cluster cluster = getConnection();
     Session session = cluster.connect("db");
     LOGGER.info("Starting ...");
     actualToken = -9223372036854775808;
     boolean run = true;
     int print = 0;

     while ( run ) {
         if (maxTokenReached(actualToken)) {
             LOGGER.info("Max Token Reached!");
             break;
         }
         SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
         stmt.setConsistencyLevel(ConsistencyLevel.ALL);
         ResultSet resultSet = session.execute(stmt);

         Iterator<Row> rows = resultSet.iterator();
         if ( !rows.hasNext()){
             break;
         }

         List<String> rowIds = new ArrayList<String>();

         while (rows.hasNext()) {
             Row row = rows.next();

             Long leadTime = row.getLong("my_column");
             if (myCondition(myCollumn)) {
                 String rowId = row.getString("key");
                 rowIds.add(rowId);
             }

             if (!rows.hasNext()) {
                 Long token = row.getLong("token(rowid)");
                 if (!rowIds.isEmpty()) {
                     LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                 }
             actualToken = nextToken(token);
             }
         }
      }
     LOGGER.info("Done!");
     cluster.shutdown(); 
  }

public boolean maxTokenReached(Long actualToken){
     return actualToken >= maxToken; 
 }

 public String queryBuilder(Long nextRange) {
     return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
 }

 public Long nextToken(Long token) {
     return token + 1; 
 }
Jeff Jirsa
  • 4,391
  • 11
  • 24
  • Hey Jeff, first, thanks for your help! I'm using CQL with Murmur and I'm aware about the max and min token values. The job today receive a range of tokens they will search the rows. This way I could throw those ranges in threads to speed this up. – bcfurtado Apr 29 '15 at 22:40
  • Second, I implemented your sugestion, but I did't had much difference from what I had done, actually, the job had return last than the first time. But once thing I notice was the load of the machine was low than before, was more distributed between the cluster all the time the job was running. Before the load get high only specific machines and in different times. – bcfurtado Apr 29 '15 at 22:42
  • Raising the consistency should cause more load, because it's querying more replicas to ensure it's not missing any data. To be clear: how many rows did it return, and how many rows do you expect it to return? – Jeff Jirsa Apr 30 '15 at 04:55
  • Exactly. I expected something like 1000 rows and only had something like 100~200. – bcfurtado May 03 '15 at 04:05
  • Have you run SELECT COUNT(*) to count. That uses the internal paging and should be fairly accurate – Jeff Jirsa May 04 '15 at 06:39
  • To use where clause is necessary the columns that will be used in the query should be part of the index or already be indexed and they aren't, so I can not use your approach. I know is expected output should be something like 1k because I store part of the info in two different databases (cassandra and mysql), and today the amount of data is different. – bcfurtado May 04 '15 at 14:12
  • I am curious any update on this. Does this answer miss rows? Quoting earlier comment: I expected something like 1000 rows and only had something like 100~200. – morpheus Dec 29 '16 at 20:03
  • On Point to using CONSISTENCY=ALL is bad as any node failure will cause application exception. – pssh Nov 16 '17 at 23:12
2

I'd highly recommend using Spark - even in a stand alone application (i.e. without a cluster). It'll take care of chunking up the partitions and process them one by one. Dead easy to use too:

https://github.com/datastax/spark-cassandra-connector

ashic
  • 6,367
  • 5
  • 33
  • 54
1

Is this for a common thing you need to do? Or a one case scenario? I agree this is not a advisable thing you want to do on a regular basis, but I also had an issue where I had to read through all rows from a ColumnFamily and I relied on AllRowsReader recipe from Astyanax client. I'm seeing that you are using Datastax CQL driver to connect to your cluster, but if what you're looking is something that is proved to work, you might not care dealing with problem using Astyanax library.

In my case I used to read all row keys and then I had another job to interact with the ColumnFamily with the keys I collected.

import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.recipes.reader.AllRowsReader;

import java.util.concurrent.CopyOnWriteArrayList;

...        

private final Keyspace keyspace;
private final ColumnFamily<String, byte[]> columnFamily;

public List<String> getAllKeys() throws Exception {

    final List<String> rowKeys = new CopyOnWriteArrayList<>();

    new AllRowsReader.Builder<>(keyspace, columnFamily).withColumnRange(null, null, false, 0)
        .withPartitioner(null) // this will use keyspace's partitioner
        .withConsistencyLevel(ConsistencyLevel.CL_ONE).forEachRow(row -> {
        if (row == null) {
            return true;
        }

        String key = row.getKey();

        rowKeys.add(key);

        return true;
    }).build().call();

    return rowKeys;
}

There are different configuration options to run this in several threads and many other things, like I said I just ran this once in my code and worked really well, I'd be happy to help if you ran into issues trying it to make it work.

Hope this helps,

José Luis

jbarrueta
  • 4,907
  • 2
  • 20
  • 21
1

If you regularly need to do full table scans of a Cassandra table, say for analytics in Spark, then I highly suggest you consider storing your data using a data model that is read-optimized. You can check out http://github.com/tuplejump/FiloDB for an example of a read-optimized setup on Cassandra.

Evan Chan
  • 246
  • 2
  • 7
1

This is a very old question, but I'm answering it because I ran into the same problem of not getting all the rows and found the cause.

This problem occurs when there are multiple rows for one partition key.

In the above implementation, when a row in the middle of a partition is returned due to the LIMIT limitation, the rest of the rows in that partition will be lost.

This is because in the next query, the where statement will start reading from the value of the next partitions.

For example, suppose we have a table like the following

partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              1|     1|                     1
              1|     2|                     1
              1|     3|                     1
              2|     4|                     2
              2|     5|                     2
              2|     6|                     2
              3|     7|                     3
              4|     8|                     4

If we run the above example code with LIMIT 2 on this table, we get...

1st iteration

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 0 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              1|     1|                     1
              1|     2|                     1

2nd iteration

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 1 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              2|     4|                     2
              2|     5|                     2

3rd iteration

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 2 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              3|     7|                     3
              4|     8|                     4

As a result, we cannot get idx 3 and 6.

This is a common paging query implementation mistake.

Ole Pannier
  • 3,208
  • 9
  • 22
  • 33
tawapyoi
  • 11
  • 2