3

I am trying to fetch data from Cassandra from a specific table and trying to insert it into another table in Cassandra after making some changes. Both the tables are located in keyspace "test". When I am trying to get the data from the first table everything works fine and it is able to fetch the data. However, in the future handler which handles the output of the first query, I am trying to insert the data into another table under the same Cassandra instance and it is gettingting failed. I am getting an error from the application stating "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" . I am not sure where I am going wrong

import threading
from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster


hosts=['127.0.0.1']
keyspace="test"
thread_local = threading.local()
cluster_ = Cluster(hosts)
def get_session():
    if hasattr(thread_local, "cassandra_session"):
        print("got session from threadlocal")
        return thread_local.cassandra_session
    print(" Connecting to Cassandra Host " + str(hosts))
    session_ = cluster_.connect(keyspace)
    print(" Connecting and creating session to Cassandra KeySpace " + keyspace)
    thread_local.cassandra_session = session_
    return session_


class PagedResultHandler(object):

    def __init__(self, future):
        self.error = None
        self.finished_event = Event()
        self.future = future
        self.future.add_callbacks(
            callback=self.handle_page,
            errback=self.handle_error)

    def handle_page(self, rows):
        for row in rows:
            process_row(row)

        if self.future.has_more_pages:
            self.future.start_fetching_next_page()
        else:
            self.finished_event.set()

    def handle_error(self, exc):
        self.error = exc
        self.finished_event.set()

def process_row(row):
    print(row)
    session_ = get_session()
    stmt = session_.prepare(
        "INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?)")
    results = session_.execute(stmt,
                               [row.customer, row.snr, row.rttt,row.created_time])
    print("Done")

session = get_session()
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster_.shutdown()

However, when I try to execute the python file the application is throwing an error "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" from getSession() call from "process_row" method. Clearly, the first call to Cassandra is getting succeeded without any issues. There is no connectivity issue and the Cassandra instance is running fine locally. I am able to query the data using cqlsh. If I call the process_row method outside the future handler everything is working fine, I am not sure what needs to be done to make it happen from the Future Handler.

Connecting to Cassandra Host ['127.0.0.1']
Connecting and creating session to Cassandra KeySpace test
Row(customer='abcd', snr=100, rttt=121, created_time=datetime.datetime(2020, 8, 8, 2, 26, 51))
 Connecting to Cassandra Host ['127.0.0.1']
Traceback (most recent call last):
  File "test/check.py", , in <module>
    raise handler.error
  File "cassandra/cluster.py", line 4579, in cassandra.cluster.ResponseFuture._set_result
  File "cassandra/cluster.py", line 4777, in cassandra.cluster.ResponseFuture._set_final_result
  File "test/check.py"",  in handle_page
    process_row(row)
  File "test/check.py"",  in process_row
    session_ = get_session()
  File "/test/check.py"", in get_session
    session_ = cluster_.connect(keyspace)
  File "cassandra/cluster.py", line 1715, in cassandra.cluster.Cluster.connect
  File "cassandra/cluster.py", line 1772, in cassandra.cluster.Cluster._new_session
  File "cassandra/cluster.py", line 2553, in cassandra.cluster.Session.__init__
cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])

Process finished with exit code 1
Dev Loper
  • 209
  • 1
  • 4
  • 18

2 Answers2

2

Ok so Cassandra recommends the following:

  • Use at most one Session per keyspace, or use a single Session and explicitely specify the keyspace in your queries

https://www.datastax.com/blog/4-simple-rules-when-using-datastax-drivers-cassandra

In your code you try to create a session every time the read query has retrieved some rows.

To force the code to use at most one session we can create a queue where the child thread sends the row to the main thread and the main thread handles it further by executing the insert query. We do this in the main thread because I've experienced issues by executing queries in child thread.

callback_queue = Queue()
session = cluster_.connect(keyspace)
session.row_factory = dict_factory # because queue doesn't accept a Row instance


class PagedResultHandler(object):

    ...

    def handle_page(self, rows):
        for row in rows:
            callback_queue.put(row) # here we pass the row as a dict to the queue
        ...

def process_rows():
    while True:
        try:
            row = callback_queue.get() # here we retrieve the row as a dict from the child thread
            stmt = session.prepare(
                "INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
            results = session.execute(stmt,
                                       [row['customer'], row['snr'], row['rttt'], row['created_time']])
            print("Done")
        except Empty:
            pass

query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
process_rows() # for now the code will hang here because we have an infinite loop in this function
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster_.shutdown()

This will get it to work, but I would replace the while True else you will get into an infinite loop.

ARR
  • 2,074
  • 1
  • 19
  • 28
  • I have upvoted and accepted the answer from you. Also awarded the bounty meant for this question. However, I do have a relatively large dataset and I do want to have these inserts to be done parallelly in order to speed up the process. The solution you gave me does not help me dump a complete table with millions of records to another table in an effective way. You can look at https://stackoverflow.com/questions/24785299/python-cassandra-driver-operationtimeout-on-every-query-in-celery-task/26677533 . This will give you an idea of why I have been creating multiple sessions per task. – Dev Loper Feb 04 '21 at 09:37
  • How can we achieve this in python by bringing in parallelism to the code ? – Dev Loper Feb 04 '21 at 09:38
1

Ok so in that case we do 2 things, we can use multithreading and batch inserting. I think if we batch insert parallelism is not required, because that will speed things up from the client side fast enough. multithreading wouldn't add much more speed to it as it is not a cpu intensive task.

session = cluster_.connect(keyspace)
session.row_factory = dict_factory


class Fetcher:

    def __init__(self, session):
        self.session = session
        query = "select * from test.data_log"
        self.statement = SimpleStatement(query, fetch_size=1000)

    def run(self):
        rows = self.session.execute(self.statement)

        temp_rows = []
        total = 0
        for row in rows:
            temp_rows.append(row)
            if len(temp_rows) == 1000:
                handler = PagedResultHandler(self.session, temp_rows)
                handler.start()
                temp_rows = []

        handler = PagedResultHandler(self.session, temp_rows)
        handler.start()

    def handle_error(self, err=None):
        print(err)


class PagedResultHandler(threading.Thread):

    def __init__(self, session, rows):
        super().__init__()
        self.session = session
        self.error = None
        self.rows = rows
        self.finished_event = Event()

    def run(self):
        batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
        stmt = session.prepare("INSERT INTO test.data(id, customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
        for row in self.rows:
            batch.add(stmt, [1, row['customer'], row['snr'], row['rttt'], row['created_time']])
        results = session.execute(batch)
        print(results)


Fetcher(session).run()

This does script does both batch inserting and multithreading, but again multithreading seems unnecessary.

ARR
  • 2,074
  • 1
  • 19
  • 28
  • There is a considerable difference between the original attempt and this one. Here you are not gaining anything by making Fetcher a separate thread. I agree on the batching part it significantly improves permission provided if you batch all the records based on the partition key, a blind batching won't help much. Also for row processing, you have brought parallelism and it certainly improves performance. I am still trying to figure out why my original approach is not working, even if you share the same session to the process_row(row): function it seems to throw "No Host available: – Dev Loper Feb 12 '21 at 06:24