2

I'm trying to get the server performance and the scalability when many people are sending requests. The tasks which will be performed are really heavy and it needs a lot of time to be finished, for example, preprocessing the signal data contained in a CSV (using MNE) or creating trained models (using Sklearn and Tensorflow). However, I can't manage to do them asynchronously, causing each request is performed synchronously. For this reason, it is impossible to know the consumption of computer resources when there are a lot of requests.

To not include many code, I will only add the required code to apply preprocessing using band-pass filter.

Here is the original code.

csv_controller.py

@csv_controller.post("/preproccessing/list")
async def apply_preproccessing(csv_filters: CSVFilters, db=Depends(get_db), 
   exists_current_researcher = Depends(get_current_researcher)):
   return await csv_service.apply_preproccessing(db, csv_filters)

csv_service.py

import mne

async def apply_preproccessing(db: Session, csv_filters: CSVFilters):
    exp = None
    text = ""
    for csv_id in csv_filters.csvs:
        csv = csv_crud.find_by_id(db, csv_id)
        if csv is not None:
            try:
                if exp is None:
                    exp = experiment_crud.find_by_id(db, csv.experiment_id)
                df = pd.read_csv(csv.path)
                rawdata = load_raw(df, exp)
                del df
                if rawdata is not None:
                    for prep in csv_filters.preproccessings:
                        if prep.__class__.__name__ == 'CSVBandpass':
                            rawdata = await apply_bandpass(prep, rawdata, csv)
                        elif prep.__class__.__name__ == 'CSVNotch':
                            rawdata = await apply_notch(prep, rawdata, csv)
                        elif prep.__class__.__name__ == 'CSVDownsampling':
                        rawdata = await apply_downsampling(prep, rawdata, csv)
                    try:
                        os.remove(csv.path)
                    except FileNotFoundError:
                        pass
                    csv.path = generate_name_csv(db)
                    csv.date = csv.path[12:31]
                    csv.type = 'prep'

                    ch_names = []
                    for x in exp.device.channels:
                        ch_names.append(x.channel.name)
                    data = convert_to_df(rawdata, ch_names)

                    data.to_csv(csv.path, index=False)
                    csv.duraction = data.shape[0]/exp.device.sample_rate
                    csv_crud.save(db, csv)
                    text += csv.name + ": Preproccessing applied\n"


            except ValueError:
                text += csv.name + ": Check frequency values\n"
            except np.linalg.LinAlgError:
                text += csv.name + ": Array must not contain infs or NaNs\n"

    return text

async def apply_bandpass(prep, rawdata, new_csv):
    l_freq = None
    h_freq = None
    text = ''

    if prep.low_freq != '':
        l_freq = float(prep.low_freq)
        text = text + 'Low Frequency: ' + prep.low_freq + 'Hz '

    if prep.high_freq != '':
        h_freq = float(prep.high_freq)
        text = text + 'High Frequency: ' + prep.high_freq + 'Hz '

    if prep.filter_method == 'fir':
        db_preproccessing = models.Preproccessing(
                            position=len(new_csv.preproccessing_list) + 1,
                            preproccessing='Bandpass',
                            csv_id=new_csv.id,
                            description='Method: FIR, ' + 'Phase: ' + prep.phase + ', ' + text)
        new_csv.preproccessing_list.append(db_preproccessing)

        return rawdata.copy().filter(l_freq=l_freq, h_freq=h_freq,
                                  method='fir', fir_design='firwin', phase=prep.phase)

    elif prep.filter_method == 'iir':
        if prep.order == '1':
            ordinal = 'st'
        elif prep.order == '2':
            ordinal = 'nd'
        else:
             ordinal = 'th'

        db_preproccessing = models.Preproccessing(
                            position=len(new_csv.preproccessing_list) + 1,
                            preproccessing='Bandpass',
                            csv_id=new_csv.id,
                            description='Method: IIR, ' + prep.order + ordinal + '-order Butterworth filter, ' + text)
        new_csv.preproccessing_list.append(db_preproccessing)

        iir_params = dict(order=int(prep.order), ftype='butter')
        return rawdata.copy().filter(l_freq=l_freq, h_freq=h_freq,
                                  method='iir', iir_params=iir_params)

csv_repository.py

from sqlalchemy.orm import Session
from app.models import models


def find_by_id(db: Session, csv_id: int) -> models.CSV:
    return db.query(models.CSV).filter(models.CSV.id == csv_id).first()


def save(db: Session, csv: models.CSV) -> models.CSV:
    db.add(csv)
    db.commit()
    db.refresh(csv)
    return csv

experiment_repository.py

from sqlalchemy.orm import Session
from app.models import models



def find_by_id(db: Session, experiment_id: int) -> Optional[models.Experiment]:
    e = db.query(models.Experiment).filter(models.Experiment.id == experiment_id).first()
    return e

I tried many things such as adding more workers (I'm looking for concurrency, not parallelism), asyncio.gather() and asyncio.create_task(), but it is still synchronous. Moreover, I searched in the official documentation that BackgroundTasks should be a valid option, but it is not since the requests need a response.

  • `async` doesn't make stuff magically work concurrent, it just means that you can give up processing time when you're not using the CPU - i.e. when you're waiting for io. Since there is no lower lever where you're waiting for any IO in your code (i.e. no `await` inside your async function) there is no place where other async code can run because you're giving up your cpu processing time. Your db queries would be the first location where you can apply async, since that's IO, the same with your `io.` operations. – MatsLindh Sep 02 '22 at 22:16
  • Using multiple workers are usually the way to work around that issue, allowing all available cpu cores to be used for processing instead. – MatsLindh Sep 02 '22 at 22:17
  • I just have solved the problem. I only needed to write `def` instead of `async def` in controller endpoints. Thank you all of you for your comments – Juan Antonio Martinez Lopez Sep 04 '22 at 22:42

0 Answers0