My goal is to successfully execute Select statements on my Postgresql dv (on Cloud SQL), using SQLAlchemy's ORM in my Flask app on Google App Engine.
I am able to successfully deploy the app, and it successfully connects to the database and issues the select statements. However, after about 1 hour of the app running, I start to get the following errors from the querys. Note that these queries successfully ran previous to 1 hour and I am able to successfully run the queries on the database from pgAdmin, or local python code.
There are two errors, the first is a KeyError that occurs at line 1778 in pg8000/core.py. On git, this line is https://github.com/tlocke/pg8000/blob/master/pg8000/core.py#L1783
This is the error, which is handled with an except block, but within that except block a Broken Pip error is raised.
File "/env/lib/python3.7/site-packages/pg8000/core.py", line 1778, in execute ps = cache['ps'][key] KeyError: ('SELECT voice_comments.id AS voice_comments_id, voice_comments.store_id AS voice_comments_store_id, voice_comments.comment_date AS voice_comments_comment_date, voice_comments.survey_item AS voice_comments_survey_item, voice_comments.comment_text AS voice_comments_comment_text, voice_comments.overall_satisfaction AS voice_comments_overall_satisfaction, voice_comments.visit_date AS voice_comments_visit_date \nFROM voice_comments \nWHERE voice_comments.store_id = %s AND voice_comments.comment_date >= %s AND voice_comments.comment_date <= %s AND voice_comments.overall_satisfaction = %s AND voice_comments.id NOT IN (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', ((705, 0, <function Connection.__init__.<locals>.text_out at 0x3eb1a3fb7c20>), (1114, 1, <function timestamp_send_integer at 0x3eb1a4a6d320>), (1114, 1, <function timestamp_send_integer at 0x3eb1a4a6d320>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>)))
In the except block that handle the above KeyError, a BrokenPipeError is raised, which is this:
Traceback (most recent call last): File "/env/lib/python3.7/site-packages/flask/app.py", line 2292, in wsgi_app response = self.full_dispatch_request() File "/env/lib/python3.7/site-packages/flask/app.py", line 1815, in full_dispatch_request rv = self.handle_user_exception(e) File "/env/lib/python3.7/site-packages/flask/app.py", line 1718, in handle_user_exception reraise(exc_type, exc_value, tb) File "/env/lib/python3.7/site-packages/flask/_compat.py", line 35, in reraise raise value File "/env/lib/python3.7/site-packages/flask/app.py", line 1813, in full_dispatch_request rv = self.dispatch_request() File "/env/lib/python3.7/site-packages/flask/app.py", line 1799, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/srv/routes/voice.py", line 26, in voice_comments result = get_voice_comments(store_id, start_date, end_date, ids_to_filter_out=used_comment_ids) File "/srv/routes/pgsql_api.py", line 18, in get_voice_comments voice_comments.id.notin_(ids_to_filter_out)).all() File "/env/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3233, in all return list(self) File "/env/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3389, in __iter__ return self._execute_and_instances(context) File "/env/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3414, in _execute_and_instances result = conn.execute(querycontext.statement, self._params) File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute return meth(self, multiparams, params) File "/env/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement distilled_params, File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context e, statement, parameters, cursor, context File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1478, in _handle_dbapi_exception util.reraise(*exc_info) File "/env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise raise value File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context cursor, statement, parameters, context File "/env/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute cursor.execute(statement, parameters) File "/env/lib/python3.7/site-packages/pg8000/core.py", line 861, in execute self._c.execute(self, operation, args) File "/env/lib/python3.7/site-packages/pg8000/core.py", line 1830, in execute self._flush() File "/opt/python3.7/lib/python3.7/socket.py", line 607, in write return self._sock.send(b) BrokenPipeError: [Errno 32] Broken pipe
I am thinking about this in 3 parts:
1. I recently switch from MySQL to PostgresSQL. With MySQL, I did not experience Broken Pipe errors..or at least they were handled gracefully behind the scenes without me knowing. I am worried that I am doing something dumb at a high level..i.e. I should set different pool_recycle or connect_timeout values when creating the engine with SQLAlchemy. I am using default values for all arguments except for pool_timeout=30, pool_recycle=1800
Since this happens after about an hour of the app running, I am wondering if connection are being disconnected (by the pg db) but SQLAlchemy is still trying to use them?
2. Should I be concerned about the KeyError at all? Or is this an expected path in the code. I am leaning towards it being an expected since it is caught by the except block etc...
3. Is the BrokenPipeError something I should handle at the query level? i.e. something like:
retry = 0
while retry<3:
try:
session.query(....
break
except BrokenPipeError:
retry += 1
continue