Redis can be used. My use case is smaller data frames so have not tested with larger data frames. This allows me to provide 3 second ticking data to multiple browser clients. pyarrow serialisation / deserialisation is performing well. Works locally and across AWS/GCloud and Azure
GET route
@app.route('/cacheget/<path:key>', methods=['GET'])
def cacheget(key):
c = mycache()
data = c.redis().get(key)
resp = Response(BytesIO(data), mimetype="application/octet-stream", direct_passthrough=True)
resp.headers["key"] = key
resp.headers["type"] = c.redis().get(f"{key}.type")
resp.headers["size"] = sys.getsizeof(data)
resp.headers["redissize"] = sys.getsizeof(c.redis().get(key))
return resp
sample route to put dataframe into cache
@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
c = mycache()
dfsensor = c.get("dfsensor")
newsensor = json_normalize(request.get_json())
newsensor[["x","y"]] = newsensor[["epoch", "value"]]
newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
newsensor["amin"] = newsensor["value"]
newsensor["amax"] = newsensor["value"]
newsensor = newsensor.drop(columns=["x","y"])
# add new data from serial interface to start of list (append old data to new data).
# default time as now to new data
dfsensor = newsensor.append(dfsensor, sort=False)
# keep size down - only last 500 observations
c.set("dfsensor", dfsensor[:500])
del dfsensor
return jsonify(result={"status":"ok"})
utility class
import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)
class mycache():
__redisClient:Redis
CONFIGKEY = "cacheconfig"
def __init__(self) -> None:
try:
ep = os.environ["REDIS_HOST"]
except KeyError:
if os.environ["HOST_ENV"] == "GCLOUD":
os.environ["REDIS_HOST"] = "redis://10.0.0.3"
elif os.environ["HOST_ENV"] == "EB":
os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
elif os.environ["HOST_ENV"] == "AZURE":
#os.environ["REDIS_HOST"] = "redis://ignore:password@redis-sensorvenv.redis.cache.windows.net"
pass # should be set in azure env variable
elif os.environ["HOST_ENV"] == "LOCAL":
os.environ["REDIS_HOST"] = "redis://127.0.0.1"
else:
raise "could not initialise redis"
return # no known redis setup
#self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
self.__redisClient.ping()
# get config as well...
self.config = self.get(self.CONFIGKEY)
if self.config is None:
self.config = {"pyarrow":True, "pickle":False}
self.set(self.CONFIGKEY, self.config)
self.alog = logenv.alog()
def redis(self) -> Redis:
return self.__redisClient
def exists(self, key:str) -> bool:
if self.__redisClient is None:
return False
return self.__redisClient.exists(key) == 1
def get(self, key:str) -> Union[DataFrame, str]:
keytype = "{k}.type".format(k=key)
valuetype = self.__redisClient.get(keytype)
if valuetype is None:
if (key.split(".")[-1] == "pickle"):
return pickle.loads(self.redis().get(key))
else:
ret = self.redis().get(key)
if ret is None:
return ret
else:
return ret.decode()
elif valuetype.decode() == str(pd.DataFrame):
# fallback to pickle serialized form if pyarrow fails
# https://issues.apache.org/jira/browse/ARROW-7961
try:
return pa.deserialize(self.__redisClient.get(key))
except pa.lib.ArrowIOError as err:
self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
return pickle.loads(self.redis().get(f"{key}.pickle"))
except OSError as err:
if "Expected IPC" in str(err):
self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
return pickle.loads(self.redis().get(f"{key}.pickle"))
else:
raise err
elif valuetype.decode() == str(type({})):
return json.loads(self.__redisClient.get(key).decode())
else:
return self.__redisClient.get(key).decode() # type: ignore
def set(self, key:str, value:Union[DataFrame, str]) -> None:
if self.__redisClient is None:
return
keytype = "{k}.type".format(k=key)
if str(type(value)) == str(pd.DataFrame):
self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
if self.config["pickle"]:
self.redis().set(f"{key}.pickle", pickle.dumps(value))
# issue should be transient through an upgrade....
# once switched off data can go away
self.redis().expire(f"{key}.pickle", 60*60*24)
elif str(type(value)) == str(type({})):
self.__redisClient.set(key, json.dumps(value))
else:
self.__redisClient.set(key, value)
self.__redisClient.set(keytype, str(type(value)))
if __name__ == '__main__':
os.environ["HOST_ENV"] = "LOCAL"
r = mycache()
rr = r.redis()
for k in rr.keys("cache*"):
print(k.decode(), rr.ttl(k))
print(rr.get(k.decode()))