I have a large dataset (~300GB) and access to a machine which can fit it into RAM (but not multiple times). I need to access sections of this dataset, perform calculations on them and then write the results to a file. My idea was to set up a database using SQLite3 to store the data and then query this database to read out the sections of data.
I have done this using Ray to set up a process (hereafter DB process) which can query the database and return the (few) results. The other (31) processes send their requests do the DB process, get the results, do the calculations and, once they have done all their work, write out to a file. Here is an example snippet of code:
import sqlite3
import ray
import numpy as np
@ray.remote(num_cpus=2, max_concurrency=1)
class DatabaseHandler:
def __init__(self):
self.con = sqlite3.connect('../Data/databse.db')
def query(self, value1, value2):
cur = self.con.cursor()
cur.execute(
f"""
SELECT col1, col2
FROM test
WHERE
col1 BETWEEN {value1 - value2} AND {value1 + value2}
"""
)
rows = np.array(cur.fetchall())
return rows
def close(self):
self.con.close()
@ray.remote(num_cpus=0.1)
def worker(db):
num_values = 1000
# Dummy values
value1s = np.random.uniform(size=num_values)
value2s = np.random.uniform(size=num_values)
for i in range(num_values):
query = db.query.remote(value1s[i], value2s[i])
rows = ray.get(query)
if len(rows):
# Do some calculation
# Append result to results list
# Write out to file
db_handler = DatabaseHandler.remote()
workers = []
for _ in range(31):
workers.append(worker.remote(db_handler))
ray.get(workers)
ray.get(db_handler.close.remote())
This works fairly well, but the running time of my code is still dominated by querying the database (which has an index on col1
). What I would like to do is use multiple threads to query the database, but my attempts at this (e.g. setting the class wrapper to be @ray.remote(num_cpus=2, max_concurrency=2)
and the sqlite3.connect
to have check_same_thread=False
) have slowed the runtime down. It's possible I'm supposed to do something to set the memory cache to be shared, but I've not managed this successfully.
I have limited experience with sqlite3, so please be explicit with any suggestions you may have. Any suggestions of alternate solutions/approaches are also very welcome.