Every 4 seconds, I have to store 32,000 rows of data. Each of these rows consists of one time stamp value and 464 double precision values. The column name for the time stamp is time
and the column name for the precision values increase sequentially as channel1
, channel2
, ..., and channel 464
.
I establish a connection as follows:
CONNECTION = f"postgres://{username}:{password}@{host}:{port}/{dbname}"#?sslmode=require"
self.TimescaleDB_Client = psycopg2.connect(CONNECTION)
I then verify the TimescaleDB
extension with the following:
def verifyTimeScaleInstall(self):
try:
sql_query = "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"
cur = self.TimescaleDB_Client.cursor()
cur.execute(sql_query)
cur.close()
self.TimescaleDB_Client.commit()
except:
self.timescaleLogger.error("An error occurred in verifyTimeScaleInstall")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
I then create a hyptertable for my data with the following:
def createRAWDataTable(self):
try:
cur = self.TimescaleDB_Client.cursor()
self.query_create_raw_data_table = None
for channel in range(self.num_channel) :
channel = channel + 1
if self.query_create_raw_data_table is None:
self.query_create_raw_data_table = f"CREATE TABLE IF NOT EXISTS raw_data (time TIMESTAMPTZ NOT NULL, channel{channel} REAL"
else:
self.query_create_raw_data_table = self.query_create_raw_data_table + f", channel{channel} REAL"
self.query_create_raw_data_table = self.query_create_raw_data_table + ");"
self.query_create_raw_data_hypertable = "SELECT create_hypertable('raw_data', 'time');"
cur.execute(self.query_create_raw_data_table)
cur.execute(self.query_create_raw_data_hypertable)
self.TimescaleDB_Client.commit()
cur.close()
except:
self.timescaleLogger.error("An error occurred in createRAWDataTable")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
I then insert the data into the hypertable using the following:
def insertRAWData(self, seconds):
try:
insert_start_time = datetime.now(pytz.timezone("MST"))
current_time = insert_start_time
num_iterations = seconds * self.fs
time_increment = timedelta(seconds=1/self.fs)
raw_data_query = self.query_insert_raw_data
dtype = "float32"
matrix = np.random.rand(self.fs*seconds,self.num_channel).astype(dtype)
cur = self.TimescaleDB_Client.cursor()
data = list()
for iteration in range(num_iterations):
raw_data_row = matrix[iteration,:].tolist() #Select a particular row and all columns
time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
raw_data_values = (time_string,)+tuple(raw_data_row)
data.append(raw_data_values)
current_time = current_time + time_increment
start_time = time.perf_counter()
psycopg2.extras.execute_values(
cur, raw_data_query, data, template=None, page_size=100
)
print(time.perf_counter() - start_time)
self.TimescaleDB_Client.commit()
cur.close()
except:
self.timescaleLogger.error("An error occurred in insertRAWData")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
The SQL Query String that I am referencing in the above code is obtained from the following:
def getRAWData_Query(self):
try:
self.query_insert_raw_data = None
for channel in range(self.num_channel):
channel = channel + 1
if self.query_insert_raw_data is None:
self.query_insert_raw_data = f"INSERT INTO raw_data (time, channel{channel}"
else:
self.query_insert_raw_data = self.query_insert_raw_data + f", channel{channel}"
self.query_insert_raw_data = self.query_insert_raw_data + ") VALUES %s;"
return self.query_insert_raw_data
except:
self.timescaleLogger.error("An error occurred in insertRAWData_Query")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
As you can see, I am using psycopg2.extras.execute_values()
to insert the values. To my understanding, this is one of the fastest ways to insert data. However, it takes about 80 seconds for me to insert this data. It is on quite a beafy system with 12 cores/24 threads
, SSDs
, and 256GB of RAM
. Can this be done faster? It just seems quite slow.
I would like to use TimescaleDB
and am evaluating its performance. But I am looking to write within 2 seconds or so for it to be acceptable.
Edit I have tried to use pandas
to perform the insert, but it took longer, at about 117 seconds. The following is the function that I used.
def insertRAWData_Pandas(self, seconds):
try:
insert_start_time = datetime.now(pytz.timezone("MST"))
current_time = insert_start_time
num_iterations = seconds * self.fs
time_increment = timedelta(seconds=1/self.fs)
raw_data_query = self.query_insert_raw_data
dtype = "float32"
matrix = np.random.rand(self.fs*seconds,self.num_channel).astype(dtype)
pd_df_dict = {}
pd_df_dict["time"] = list()
for iteration in range(num_iterations):
time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
pd_df_dict["time"].append(time_string)
current_time = current_time + time_increment
for channel in range(self.num_channel):
pd_df_dict[f"channel{channel}"] = matrix[:,channel].tolist()
start_time = time.perf_counter()
pd_df = pd.DataFrame(pd_df_dict)
pd_df.to_sql('raw_data', self.engine, if_exists='append')
print(time.perf_counter() - start_time)
except:
self.timescaleLogger.error("An error occurred in insertRAWData_Pandas")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
edit I have tried to use CopyManager
and it appears to be producing the best results at around 74 seconds. Still not what I was after however.
def insertRAWData_PGCOPY(self, seconds):
try:
insert_start_time = datetime.now(pytz.timezone("MST"))
current_time = insert_start_time
num_iterations = seconds * self.fs
time_increment = timedelta(seconds=1/self.fs)
dtype = "float32"
matrix = np.random.rand(num_iterations,self.num_channel).astype(dtype)
data = list()
for iteration in range(num_iterations):
raw_data_row = matrix[iteration,:].tolist() #Select a particular row and all columns
#time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
raw_data_values = (current_time,)+tuple(raw_data_row)
data.append(raw_data_values)
current_time = current_time + time_increment
channelList = list()
for channel in range(self.num_channel):
channel = channel + 1
channelString = f"channel{channel}"
channelList.append(channelString)
channelList.insert(0,"time")
cols = tuple(channelList)
start_time = time.perf_counter()
mgr = CopyManager(self.TimescaleDB_Client, 'raw_data', cols)
mgr.copy(data)
self.TimescaleDB_Client.commit()
print(time.perf_counter() - start_time)
except:
self.timescaleLogger.error("An error occurred in insertRAWData_PGCOPY")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
I tried to modify the following values in postgresql.conf
. There wasn't a noticeable performance improvement.
wal_level = minimal
fsync = off
synchronous_commit = off
wal_writer_delay = 2000ms
commit_delay = 100000
I have tried to modify the chunk size according to one of the below comments using the following in my createRawDataTable()
function. However, there wasn't an improvement in the insert times. Perhaps this was also expectable given that I haven't been accumulating data. The data in the database has only been a few samples, perhaps at most 1 minute worth over the course of my testing.
self.query_create_raw_data_hypertable = "SELECT create_hypertable('raw_data', 'time', chunk_time_interval => INTERVAL '3 day',if_not_exists => TRUE);"
Edit For anyone reading this, I was able to pickle and insert an 32000x464 float32 numpy matrix in about 0.5 seconds for MongoDB, which is what my final solution is. Perhaps MongoDB just does better with this workload in this case.