I'm trying create a function that takes an SQL SELECT query as a parameter and use dask to read its results into a dask DataFrame using the dask.read_sql_query
function. I am new to dask and to SQLAlchemy.
I first tried this:
import dask.dataFrame as dd
query = "SELECT name, age, date_of_birth from customer"
df = dd.read_sql_query(sql=query, con=con_string, index_col="name", npartitions=10)
As you probably already know, this won't work because the sql
parameter has to be an SQLAlchemy selectable and more importantly, TextClause
isn't supported.
I then wrapped the query behind a select
like this:
import dask.dataFrame as dd
from sqlalchemy import sql
query = "SELECT name, age, date_of_birth from customer"
sa_query = sql.select(sql.text(query))
df = dd.read_sql_query(sql=sa_query, con=con_string, index_col="name")
This fails too with a very weird error that I have been trying to solve. The problem is that dask needs to infer the types of the columns and it does so by reading the first head_row
rows in the table - 5 rows by default - and infer the types there. This line in the dask codebase adds a LIMIT ?
to the query, which ends up being
SELECT name, age, date_of_birth from customer LIMIT param_1
The param_1
doesn't get substituted at all with the right value - 5 in this case. It then fails on the next line, https://github.com/dask/dask/blob/main/dask/dataframe/io/sql.py#L119, tjat evaluates the SQL expression.
sqlalchemy.exc.ProgrammingError: (mariadb.ProgrammingError) You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'SELECT name, age, date_of_birth from customer
LIMIT ?' at line 1
[SQL: SELECT SELECT name, age, date_of_birth from customer
LIMIT ?]
[parameters: (5,)]
(Background on this error at: https://sqlalche.me/e/14/f405)
I can't understand why param_1
wasn't substituted with the value of head_rows. One can see from the error message that it detects there's a parameter that needs to be used for the substitution but for some reason it doesn't actually substitute it.
Perhaps, I didn't correctly create the SQLAlchemy selectable?
I can simply use pandas.read_sql
and create a dask dataframe from the resulting pandas dataframe but that defeats the purpose of using dask in the first place.
I have the following constraints:
- I cannot change the function to accept a ready-made sqlalchemy selectable. This feature will be added to a private library used at my company and various projects using this library do not use sqlalchemy.
- Passing
meta
to the custom function is not an option because it would require the caller do create it. However, passing ameta
attribute toread_sql_query
and settinghead_rows=0
is completely ok as long as there's an efficient way to retrieve/create - while dask-sql might work for this case, using it is not an option, unfortunately
How can I go about correctly reading an SQL query into dask dataframe?