I'm trying to get the server performance and the scalability when many people are sending requests. The tasks which will be performed are really heavy and it needs a lot of time to be finished, for example, preprocessing the signal data contained in a CSV (using MNE) or creating trained models (using Sklearn and Tensorflow). However, I can't manage to do them asynchronously, causing each request is performed synchronously. For this reason, it is impossible to know the consumption of computer resources when there are a lot of requests.
To not include many code, I will only add the required code to apply preprocessing using band-pass filter.
Here is the original code.
csv_controller.py
@csv_controller.post("/preproccessing/list")
async def apply_preproccessing(csv_filters: CSVFilters, db=Depends(get_db),
exists_current_researcher = Depends(get_current_researcher)):
return await csv_service.apply_preproccessing(db, csv_filters)
csv_service.py
import mne
async def apply_preproccessing(db: Session, csv_filters: CSVFilters):
exp = None
text = ""
for csv_id in csv_filters.csvs:
csv = csv_crud.find_by_id(db, csv_id)
if csv is not None:
try:
if exp is None:
exp = experiment_crud.find_by_id(db, csv.experiment_id)
df = pd.read_csv(csv.path)
rawdata = load_raw(df, exp)
del df
if rawdata is not None:
for prep in csv_filters.preproccessings:
if prep.__class__.__name__ == 'CSVBandpass':
rawdata = await apply_bandpass(prep, rawdata, csv)
elif prep.__class__.__name__ == 'CSVNotch':
rawdata = await apply_notch(prep, rawdata, csv)
elif prep.__class__.__name__ == 'CSVDownsampling':
rawdata = await apply_downsampling(prep, rawdata, csv)
try:
os.remove(csv.path)
except FileNotFoundError:
pass
csv.path = generate_name_csv(db)
csv.date = csv.path[12:31]
csv.type = 'prep'
ch_names = []
for x in exp.device.channels:
ch_names.append(x.channel.name)
data = convert_to_df(rawdata, ch_names)
data.to_csv(csv.path, index=False)
csv.duraction = data.shape[0]/exp.device.sample_rate
csv_crud.save(db, csv)
text += csv.name + ": Preproccessing applied\n"
except ValueError:
text += csv.name + ": Check frequency values\n"
except np.linalg.LinAlgError:
text += csv.name + ": Array must not contain infs or NaNs\n"
return text
async def apply_bandpass(prep, rawdata, new_csv):
l_freq = None
h_freq = None
text = ''
if prep.low_freq != '':
l_freq = float(prep.low_freq)
text = text + 'Low Frequency: ' + prep.low_freq + 'Hz '
if prep.high_freq != '':
h_freq = float(prep.high_freq)
text = text + 'High Frequency: ' + prep.high_freq + 'Hz '
if prep.filter_method == 'fir':
db_preproccessing = models.Preproccessing(
position=len(new_csv.preproccessing_list) + 1,
preproccessing='Bandpass',
csv_id=new_csv.id,
description='Method: FIR, ' + 'Phase: ' + prep.phase + ', ' + text)
new_csv.preproccessing_list.append(db_preproccessing)
return rawdata.copy().filter(l_freq=l_freq, h_freq=h_freq,
method='fir', fir_design='firwin', phase=prep.phase)
elif prep.filter_method == 'iir':
if prep.order == '1':
ordinal = 'st'
elif prep.order == '2':
ordinal = 'nd'
else:
ordinal = 'th'
db_preproccessing = models.Preproccessing(
position=len(new_csv.preproccessing_list) + 1,
preproccessing='Bandpass',
csv_id=new_csv.id,
description='Method: IIR, ' + prep.order + ordinal + '-order Butterworth filter, ' + text)
new_csv.preproccessing_list.append(db_preproccessing)
iir_params = dict(order=int(prep.order), ftype='butter')
return rawdata.copy().filter(l_freq=l_freq, h_freq=h_freq,
method='iir', iir_params=iir_params)
csv_repository.py
from sqlalchemy.orm import Session
from app.models import models
def find_by_id(db: Session, csv_id: int) -> models.CSV:
return db.query(models.CSV).filter(models.CSV.id == csv_id).first()
def save(db: Session, csv: models.CSV) -> models.CSV:
db.add(csv)
db.commit()
db.refresh(csv)
return csv
experiment_repository.py
from sqlalchemy.orm import Session
from app.models import models
def find_by_id(db: Session, experiment_id: int) -> Optional[models.Experiment]:
e = db.query(models.Experiment).filter(models.Experiment.id == experiment_id).first()
return e
I tried many things such as adding more workers (I'm looking for concurrency, not parallelism), asyncio.gather() and asyncio.create_task(), but it is still synchronous. Moreover, I searched in the official documentation that BackgroundTasks should be a valid option, but it is not since the requests need a response.