10

I'm trying to find a way to log all queries done on a Cassandra from a python code. Specifically logging as they're done executing using a BatchStatement

Are there any hooks or callbacks I can use to log this?

dado_eyad
  • 601
  • 1
  • 8
  • 28

3 Answers3

6

2 options:

  1. Stick to session.add_request_init_listener

    From the source code:

    a) BoundStatement

    https://github.com/datastax/python-driver/blob/3.11.0/cassandra/query.py#L560

    The passed values are stored in raw_values, you can try to extract it

    b) BatchStatement

    https://github.com/datastax/python-driver/blob/3.11.0/cassandra/query.py#L676

    It stores all the statements and parameters used to construct this object in _statements_and_parameters. Seems it can be fetched although it’s not a public property

    c) Only this hook is called, I didn’t manage to find any other hooks https://github.com/datastax/python-driver/blob/master/cassandra/cluster.py#L2097

    But it has nothing to do with queries actual execution - it's just a way to inspect what kind of queries has been constructed and maybe add additional callbacks/errbacks

  2. Approach it from a different angle and use traces

    https://datastax.github.io/python-driver/faq.html#how-do-i-trace-a-request https://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.ResponseFuture.get_all_query_traces

    Request tracing can be turned on for any request by setting trace=True in Session.execute_async(). View the results by waiting on the future, then ResponseFuture.get_query_trace()

Here's an example of BatchStatement tracing using option 2:

bs = BatchStatement()                                                        
bs.add_all(['insert into test.test(test_type, test_desc) values (%s, %s)',   
            'insert into test.test(test_type, test_desc) values (%s, %s)',   
            'delete from test.test where test_type=%s',
            'update test.test set test_desc=%s where test_type=%s'],
           [['hello1', 'hello1'],                                            
            ['hello2', 'hello2'],                                            
            ['hello2'],
            ['hello100', 'hello1']])     
res = session.execute(bs, trace=True)                                        
trace = res.get_query_trace()                                                
for event in trace.events:                                                   
    if event.description.startswith('Parsing'):                              
        print event.description 

It produces the following output:

Parsing insert into test.test(test_type, test_desc) values ('hello1', 'hello1')
Parsing insert into test.test(test_type, test_desc) values ('hello2', 'hello2')
Parsing delete from test.test where test_type='hello2'
Parsing update test.test set test_desc='hello100' where test_type='hello1'
Oleg Kuralenko
  • 11,003
  • 1
  • 30
  • 40
  • I'm using `BatchStatement `. I tried accessing `_statements_and_parameters ` but the query and values are encoded. The hook is called when the requested is initiated, not when the query is done executing. So I might be logging queries that failed to execute if I use it. – dado_eyad Oct 24 '17 at 21:25
  • Also tried tracing. Doesn't return a full log of the queries executed by `BatchStatement` – dado_eyad Oct 24 '17 at 21:26
  • It works in the way you said. But it doesn't work when the batch is created like this `bs.add(session.prepare('insert into test.test(test_type, test_desc) values (?, ?)'), ['hello1', 'hello1'])` – dado_eyad Nov 07 '17 at 15:32
3

add_request_init_listener(fn, *args, **kwargs)

Adds a callback with arguments to be called when any request is created.

It will be invoked as fn(response_future, *args, **kwargs) after each client request is created, and before the request is sent*

Using the callback you can easily log all query made by that session.

Example :

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider


class RequestHandler:

    def on_request(self, rf):
        # This callback is invoked each time a request is created, on the thread creating the request.
        # We can use this to count events, or add callbacks
        print(rf.query)


auth_provider = PlainTextAuthProvider(
    username='cassandra',
    password='cassandra'
)

cluster = Cluster(['192.168.65.199'],auth_provider=auth_provider)
session = cluster.connect('test')

handler = RequestHandler()
# each instance will be registered with a session, and receive a callback for each request generated
session.add_request_init_listener(handler.on_request)

from time import sleep

for count in range(1, 10):
    print(count)
    for row in session.execute("select * from kv WHERE key = %s", ["ed1e49e0-266f-11e7-9d76-fd55504093c1"]):
        print row
    sleep(1)
Ashraful Islam
  • 12,470
  • 3
  • 32
  • 53
  • It's a good approach but this callback is called before the query is executed and it returns the statement object and not the query string. In the case of `Preparedstatement` and `BatchStatement` the query and it's values and encoded and I couldn't find a way to decode them. – dado_eyad Oct 23 '17 at 09:06
1

Have you considered creating a decorator for your execute or equivalent (e.g. execute_concurrent) that logs the CQL query used for your statement or prepared statement?

You can write this in a manner that the CQL query is only logged if the query was executed successfully.

Andrew Mo
  • 1,433
  • 9
  • 12
  • I can't find how to access the query string after it has been executed by the batch. – dado_eyad Oct 20 '17 at 08:25
  • You can add a callback to execute_async(): https://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.ResponseFuture.add_callback – Andrew Mo Oct 20 '17 at 09:04
  • The callback returns a `ResultSet` which doesn't have include the executed quieres. – dado_eyad Oct 22 '17 at 13:50