4

I have a Flask web app that uses a large DataFrame ( hundreds of Megs). The DataFrame is used in the app for several different machine learning models. I want to create the DataFrame only once in the application and use it across multiple requests so that the user may build different models based on the same data. The Flask session is not built for large data, so that is not an option. I do not want to go back and recreate the DataFrame in case the source of the data is a csv file(yuck). be

I have a solution that works, but I cannot find any discussion of this solution in stack overflow. That makes me suspicious that my solution may not be a good design idea. I have always used the assumption that a well beaten path in software development is a path well chosen.

My solution is simply to create a data holder class with one class variable:

class DataHolder:
     dataFrameHolder = None

Now the dataFrameHolder is known across all class instances (like a static variable in Java) since it is stored in memory on the server.

I can now create the DataFrame once, put it into the DataHolder class:

import pandas as pd
from dataholder import DataHolder

result_set = pd.read_sql_query(some_SQL, connection)
df = pd.DataFrame(result_set, columns=['col1', 'col2',....]
DataHolder.dataFrameHolder = df

Then access that DataFrame from any code that imports the DataHolder class. I can then use the stored DataFrame anywhere in the application, including across different requests:

.
.
modelDataFrame = DataHolder.dataFrameHolder
do_some_model(modelDataFrame)
.
.

Is this a bad idea, a good idea, or is there something else that I am not aware of that already solves the problem?

Eli G
  • 41
  • 3
  • Hmmm - the approach I used was serialise into a redis cache and then get from Redis and deserialise in each request. However my data frames are much smaller. I experimented with effectively using global variables / references but nothing works when you move into real live running (where you are using `gunicorn` or deployed to AWS/GCloud/Azure to be multi-threaded, flask in dev mode is single threaded and will block and wait on synchronous requests) – Rob Raymond Jul 31 '20 at 16:52
  • Yes I am aware of Redis. I understand that it is great for storing key-value pairs and even hashes. But my use case is just to temporarily store hundreds of Megs of data as a DataFrame. So I am trying to avoid converting to write and then converting back to read the data. Also I am using an Apache server with the mod_wsgi module to support Python based web apps such as Flask. – Eli G Aug 01 '20 at 17:25
  • @EliG Nice approach, have same problem as you, and your suggestion works well with large Pandas – Darius Aug 15 '21 at 12:16

2 Answers2

1

Redis can be used. My use case is smaller data frames so have not tested with larger data frames. This allows me to provide 3 second ticking data to multiple browser clients. pyarrow serialisation / deserialisation is performing well. Works locally and across AWS/GCloud and Azure

GET route

@app.route('/cacheget/<path:key>', methods=['GET'])
def cacheget(key):
    c = mycache()
    data = c.redis().get(key)
    resp = Response(BytesIO(data), mimetype="application/octet-stream", direct_passthrough=True)
    resp.headers["key"] = key
    resp.headers["type"] = c.redis().get(f"{key}.type")
    resp.headers["size"] = sys.getsizeof(data)
    resp.headers["redissize"] = sys.getsizeof(c.redis().get(key))
    return resp

sample route to put dataframe into cache

@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
    c = mycache()
    dfsensor = c.get("dfsensor")
    newsensor = json_normalize(request.get_json())
    newsensor[["x","y"]] = newsensor[["epoch", "value"]]
    newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
    newsensor["amin"] = newsensor["value"]
    newsensor["amax"] = newsensor["value"]
    newsensor = newsensor.drop(columns=["x","y"])

    # add new data from serial interface to start of list (append old data to new data).
    # default time as now to new data
    dfsensor = newsensor.append(dfsensor, sort=False)
    # keep size down - only last 500 observations
    c.set("dfsensor", dfsensor[:500])
    del dfsensor

    return jsonify(result={"status":"ok"})

utility class

import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)

class mycache():
    __redisClient:Redis
    CONFIGKEY = "cacheconfig"

    def __init__(self) -> None:
        try:
            ep = os.environ["REDIS_HOST"]
        except KeyError:
            if os.environ["HOST_ENV"] == "GCLOUD":
                os.environ["REDIS_HOST"] = "redis://10.0.0.3"
            elif os.environ["HOST_ENV"] == "EB":
                os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
            elif os.environ["HOST_ENV"] == "AZURE":
                #os.environ["REDIS_HOST"] = "redis://ignore:password@redis-sensorvenv.redis.cache.windows.net"
                pass # should be set in azure env variable
            elif os.environ["HOST_ENV"] == "LOCAL":
                os.environ["REDIS_HOST"] = "redis://127.0.0.1"
            else:
                raise "could not initialise redis"
                return # no known redis setup

        #self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
        self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
        self.__redisClient.ping()
        # get config as well...
        self.config = self.get(self.CONFIGKEY)
        if self.config is None:
            self.config = {"pyarrow":True, "pickle":False}
            self.set(self.CONFIGKEY, self.config)
        self.alog = logenv.alog()

    def redis(self) -> Redis:
        return self.__redisClient


    def exists(self, key:str) -> bool:
        if self.__redisClient is None:
            return False

        return self.__redisClient.exists(key) == 1

    def get(self, key:str) -> Union[DataFrame, str]:
        keytype = "{k}.type".format(k=key)
        valuetype = self.__redisClient.get(keytype)
        if valuetype is None:
            if (key.split(".")[-1] == "pickle"):
                return pickle.loads(self.redis().get(key))
            else:
                ret = self.redis().get(key)
                if ret is None:
                    return ret
                else:
                    return ret.decode()
        elif valuetype.decode() == str(pd.DataFrame):
            # fallback to pickle serialized form if pyarrow fails
            # https://issues.apache.org/jira/browse/ARROW-7961
            try:
                return pa.deserialize(self.__redisClient.get(key))
            except pa.lib.ArrowIOError as err:
                self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                return pickle.loads(self.redis().get(f"{key}.pickle"))
            except OSError as err:
                if "Expected IPC" in str(err):
                    self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                    return pickle.loads(self.redis().get(f"{key}.pickle"))
                else:
                    raise err

        elif valuetype.decode() == str(type({})):
            return json.loads(self.__redisClient.get(key).decode())
        else:
            return self.__redisClient.get(key).decode() # type: ignore

    def set(self, key:str, value:Union[DataFrame, str]) -> None:
        if self.__redisClient is None:
            return
        keytype = "{k}.type".format(k=key)

        if str(type(value)) == str(pd.DataFrame):
            self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
            if self.config["pickle"]:
                self.redis().set(f"{key}.pickle", pickle.dumps(value))
                # issue should be transient through an upgrade....
                # once switched off data can go away
                self.redis().expire(f"{key}.pickle", 60*60*24)
        elif str(type(value)) == str(type({})):
            self.__redisClient.set(key, json.dumps(value))
        else:
            self.__redisClient.set(key, value)

        self.__redisClient.set(keytype, str(type(value)))


if __name__ == '__main__':
    os.environ["HOST_ENV"] = "LOCAL"
    r = mycache()
    rr = r.redis()
    for k in rr.keys("cache*"):
        print(k.decode(), rr.ttl(k))
        print(rr.get(k.decode()))

Rob Raymond
  • 29,118
  • 3
  • 14
  • 30
1

I had a similar problem, as I was importing CSVs (100s of MBs) and creating DataFrames on the fly for each request, which as you said was yucky! I also tried the REDIS way, to cache it, and that improved performance for a while, until I realized that making changes to the underlying data meant updating the cache as well.

Then I found a world beyond CSV, and more performant file formats like Pickle, Feather, Parquet, and others. You can read more about them here. You can import/export CSVs all you want, but use intermediate formats for processing.

I did run into some issues though. I have read that Pickle has security issues, even though I still use it. Feather wouldn't let me write some object types in my data, it needed them categorized. Your mileage may vary, but if you have good clean data, use Feather.

And more recently I've found that I manage large data using Datatable instead of Pandas, and storing them in Jay for even better read/write performance.

This does however mean re-writing bits of code that use Pandas into DataTable, but I believe the APIs are very similar. I have not yet done it myself, because of very large codebase, but you can give it a try.

Shaji Ahmed
  • 91
  • 4
  • 4
  • I also chose datatable with Jay. It's unbelievably fast for my case, with less than 1s response time for 10GB data. However, it is sad that Jay is not popular and datatable has limited capabilities compared to Pandas. That was a trade off. – Quiescent Jul 31 '22 at 13:30