0

I have a Flask application with reads a dataframe and provide it in a service. The problem is that I need to update it (only a reading from s3) with some frequency. And in this between time the dataframe need to be avaliable, or the service could return some kind of error. Maybe is possible with some sort of parallelism. My code is similar to this one:

from flask import Flask, request, make_response
import pandas as pd

# this dataframe needs to be updated
df = pd.read_parquet("s3://data/data.parquet.gzip")

app = Flask(__name__)

# this application needs to be avaiable in the df update
@app.route('/application',  methods=["POST"])
def application():
    data = request.json
    return make_response(function_(df, data), 200)


if __name__ == "__main__":
    app.run(host='0.0.0.0', port=8080)

1 Answers1

0

One option is to have a thread in the background change the dataframe as needed. The complicating factor is dealing with the dataframe updating while calculations are active. In the posted code, you are thread safe because df is only referenced once by the consumer when function_(df, data) is called. If that's the case, then you don't need any special synchronization. If the global df variable is referenced multiple times in the calculations, you'll need to lock access. Assuming this scenario, here is a locking version of your code. But the example code as posted would not need it.

from flask import Flask, request, make_response
import pandas as pd
import threading
import time

def df_updater():
    global df
    while True:
        # TODO: Decide on update based on your requirements
        time.sleep(60)
        df_tmp = pd.read_parquet("s3://data/data.parquet.gzip")
        with df_lock:
            df = df_tmp
        del df_tmp

df_lock = threading.Lock()
# Get the first sample
df = pd.read_parquet("s3://data/data.parquet.gzip")
# start thread for future samples
df_updater_thread = threading.Thread(target=df_updater)

app = Flask(__name__)

# this application needs to be avaiable in the df update
@app.route('/application',  methods=["POST"])
def application():
    data = request.json
    with df_lock:
        return make_response(function_(df, data), 200)


if __name__ == "__main__":
    app.run(host='0.0.0.0', port=8080)
tdelaney
  • 73,364
  • 6
  • 83
  • 116