0

I'm using

celery == 4.1.0 (latentcall)
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Python 2.7.14

I'm trying to execute Cassandra Query in Celery worker function. But Celery worker received task but not execute Query.

tasks.py

from cassandra.cluster import Cluster
from celery import Celery

app = Celery('<workername>', backend="rpc://", broker='redis://localhost:6379/0')
dbSession = Cluster().connect()


@app.tasks()
def get_data():
    query = "SELECT * FROM customers"
    CustomerObj = dbSession.execute(dbSession.prepare(query))

    return CustomerObj


get_data.delay()

I start worker using :

$ celery worker -A <worker_name> -l INFO -c 1

 -------------- celery@ubuntu v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.13.0-21-generic-x86_64-with-Ubuntu-17.10-artful 2018-04-20 14:31:41
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         Woker:0x7fa4a0e6f310
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . Worker.get_data

[2018-04-20 14:31:41,271: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-04-20 14:31:41,285: INFO/MainProcess] mingle: searching for neighbors
[2018-04-20 14:31:42,315: INFO/MainProcess] mingle: all alone
.............
[2018-04-20 14:31:42,332: INFO/MainProcess] celery@ubuntu ready.
[2018-04-20 14:31:43,823: INFO/MainProcess] Received task: <worker_name>.get_data[8de91fdf-1388-4d5c-bb22-8cb00c1c065e]  

Worker process is just stopped there.It will not execute that SELECT query and give any data.

Anyone suggest me How can I run this code to execute Cassandra Queries.

Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
RaTh0D
  • 323
  • 3
  • 19

2 Answers2

0

I think that you can't define dbSession globally. Celery task can run in different workers, so the connection can't be global.

I can suggest two options:

  1. Create the session within the task. It should work. The pros is that you'll create new session per each task. Maybe lazy (@LazyProperty) should help here.

  2. You can create the connection in the worker level: try to create your session when worker start, maybe with worker_init signal (ref). The problem here is that you can have concurrency level > 1 (depends how you start the worker) - and than you need pool of sessions to serve more than one celery task at a time (handle more than one Cassandra session at a time).

By the way you should use the global keyword in python. If you are running one instance it may fix too.

Here is a related question that might help you: Celery Worker Database Connection Pooling

Good luck!

ItayB
  • 10,377
  • 9
  • 50
  • 77
0

Since celery doesn't use the application's connection instance. Initiate a new connection at celery initiation. the below snippet is as per Cassandra documentation for celery

from celery import Celery
from celery.signals import worker_process_init, beat_init
from cassandra.cqlengine import connection
from cassandra.cqlengine.connection import (
    cluster as cql_cluster, session as cql_session)

def cassandra_init(**kwargs):
    """ Initialize a clean Cassandra connection. """
    if cql_cluster is not None:
        cql_cluster.shutdown()
    if cql_session is not None:
        cql_session.shutdown()
    connection.setup()

# Initialize worker context for both standard and periodic tasks.
worker_process_init.connect(cassandra_init)
beat_init.connect(cassandra_init)

app = Celery()

This worked for me

Sathish Kumar
  • 547
  • 6
  • 17