I'd like to store larger amounts of python float timestamps as standard postgresql timestamps using a custom SQLalchemy data type. I saw possible solutions like Defining a table with sqlalchemy with a mysql unix timestamp . What I don't like about it, is that it's rather inefficient, as it's converting a float into a time object, and back into a timestamp, while this should simply be a "multiply by 1e6, add a constant and cast to int"-conversion, which should timewise be close to a noop. While
%timeit float(calendar.timegm(datetime.datetime.utcfromtimestamp(1542098001).timetuple()))
wastes something like 2us per call on my machine. Using postgresql to_timestamp
and extract(epoch from timestamp_col)
should be way faster and better suited for analytical workloads. Is there a way in SQLalchemy, that can apply those conversions on SQL level automatically for all statements affecting that column? I also considered using a simple float field for storing, but I'd prefer to be able to use the time functions for accessing the data.
import time
from datetime import datetime
from calendar import timegm
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, DateTime, TypeDecorator, create_engine
class Timestamp(TypeDecorator):
impl = DateTime
def process_bind_param(self, value, dialect):
if value is None:
return
elif isinstance(value, datetime):
return value
elif isinstance(value, (int, float)):
return datetime.utcfromtimestamp(value)
raise TypeError("Any form of time object required, but received {}".format(type(value).__name__))
def process_result_value(self, value, dialect):
if value is None:
return None # support nullability
elif isinstance(value, datetime):
return float(timegm(value.timetuple()))
raise vn.ValidationTypeError("datetime object required, but received {}".format(type(value).__name__))
Base = declarative_base()
class SystemLog(Base):
__tablename__ = 'systemlog'
id = Column(Integer, primary_key=True)
time = Column(Timestamp, index=True, nullable=False, default=datetime.utcnow)
type = Column(String(20))
message = Column(String(2000))
engine = create_engine('sqlite://')
Session = sessionmaker(bind=engine)
s = Session()
Base.metadata.create_all(engine)
s.commit()
start_time = time.time()
sample_data = [SystemLog(time=start_time + i) for i in xrange(10000)]
s.add_all(sample_data)
s.commit()