4

When using Cassandra's recommended RandomPartitioner (or Murmur3Partitioner), it is not possible to do meaningful range queries on keys, because the rows are distributed around the cluster using the md5 hash of the key. These hashes are called "tokens."

Nonetheless, it would be very useful to split up a large table amongst many compute workers by assigning each a range of tokens. Using CQL3, it appears possible to issue queries directly against the tokens, however the following python does not work... EDIT: works after switching to testing against the lastest version of the cassandra database (doh!), and also updating the syntax per notes below:

## use python cql module
import cql

## If running against an old version of Cassandra, this raises: 
## TApplicationException: Invalid method name: 'set_cql_version'
conn = cql.connect('localhost', cql_version='3.0.2')

cursor = conn.cursor()

try:
    ## remove the previous attempt to make this work
    cursor.execute('DROP KEYSPACE test;')
except Exception, exc:
    print exc

## make a keyspace and a simple table
cursor.execute("CREATE KEYSPACE test WITH strategy_class = 'SimpleStrategy' AND strategy_options:replication_factor = 1;")
cursor.execute("USE test;")
cursor.execute('CREATE TABLE data (k int PRIMARY KEY, v varchar);')

## put some data in the table -- must use single quotes around literals, not double quotes                                                                                                                                   
cursor.execute("INSERT INTO data (k, v) VALUES (0, 'a');")
cursor.execute("INSERT INTO data (k, v) VALUES (1, 'b');")
cursor.execute("INSERT INTO data (k, v) VALUES (2, 'c');")
cursor.execute("INSERT INTO data (k, v) VALUES (3, 'd');")

## split up the full range of tokens.
## Suppose there are 2**k workers:
k = 3 # --> eight workers
token_sub_range = 2**(127 - k)
worker_num = 2 # for example
start_token =    worker_num  * token_sub_range
end_token = (1 + worker_num) * token_sub_range

## put single quotes around the token strings
cql3_command = "SELECT k, v FROM data WHERE token(k) >= '%d' AND token(k) < '%d';" % (start_token, end_token)
print cql3_command

## this fails with "ProgrammingError: Bad Request: line 1:28 no viable alternative at input 'token'"
cursor.execute(cql3_command)

for row in cursor:
    print row

cursor.close()
conn.close()

I would ideally like to make this work with pycassa, because I prefer its more pythonic interface.

Is there a better way to do this?

ttst
  • 493
  • 1
  • 5
  • 10

2 Answers2

1

I have updated the question to contain the answer.

ttst
  • 493
  • 1
  • 5
  • 10
0

It's not CQL3, but here's a simple program that times reading all the (pickled) data owned by localhost using the Thrift interface directly. This can be used to build a simple map/reduce engine with Cassandra as the backend. Every node would run something like this to map() over data that belongs to itself, thus incurring no network overhead for data retrieval. The result would then be shipped back to a reduce() phase on a separate node.

Obviously, this does not work well for vnodes in Cassandra1.2+. I'm now using an indexing approach to allow for a map() over smaller subsets of local data and support for vnodes.

#!/usr/bin/env python2.7

import sys
import socket
import cPickle as pickle
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from pycassa.cassandra import Cassandra
from pycassa.cassandra.ttypes import *
import time
import pprint

def main():
    jobname = sys.argv[1]
    pp = pprint.PrettyPrinter(indent=2)

    (client, transport) = connect("localhost")

    # Determine local IP address
    ip = socket.gethostbyname(socket.gethostname())

    # Set up query
    keyspace = "data"
    column_parent = ColumnParent(column_family=foo)

    try:
        # Find range of tokens for which this node is first replica
        for tokenrange in client.describe_ring(keyspace):
            if tokenrange.endpoints[0] == ip:
                start_token=tokenrange.start_token
                end_token=tokenrange.end_token
                break

        # Set kesypace
        client.set_keyspace(keyspace)

        # Query for all data owned by this node
        slice_range = SliceRange(start="", finish="")
        predicate = SlicePredicate(slice_range=slice_range)
        keyrange = KeyRange(start_token=start_token, end_token=end_token, count=10000)
        t0 = time.time()
        ptime = 0
        keycount = 0
        start=""
        for keyslice in client.get_range_slices(column_parent, predicate, keyrange, ConsistencyLevel.ONE):
            keycount += 1
            for col in keyslice.columns:
                pt0 = time.time()
                data = pickle.loads(col.column.value)
                ptime += time.time() - pt0
    except Thrift.TException, tx:
        print 'Thrift: %s' % tx.message
    finally:
        disconnect(transport)

    t1 = time.time() - t0
    print "Read data for %d tasks in: %.2gs" %(keycount, t1)
    print "Job unpickling time: %.2gs" %ptime
    print "Unpickling percentage: %.2f%%" %(ptime/t1*100)

def connect(host):
    """ 
    Connect to cassandra instance on given host.
    Returns: Cassandra.Client object
    """
    socket = TSocket.TSocket(host, 9160)
    transport = TTransport.TFramedTransport(socket)
    protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
    transport.open()
    client = Cassandra.Client(protocol) 
    return (client, transport)

def disconnect(transport):
    """ 
    Disconnect from cassandra instance
    """
    transport.close()

if __name__ == '__main__':
    main()