1

My goal is to successfully execute Select statements on my Postgresql dv (on Cloud SQL), using SQLAlchemy's ORM in my Flask app on Google App Engine.

I am able to successfully deploy the app, and it successfully connects to the database and issues the select statements. However, after about 1 hour of the app running, I start to get the following errors from the querys. Note that these queries successfully ran previous to 1 hour and I am able to successfully run the queries on the database from pgAdmin, or local python code.

There are two errors, the first is a KeyError that occurs at line 1778 in pg8000/core.py. On git, this line is https://github.com/tlocke/pg8000/blob/master/pg8000/core.py#L1783

This is the error, which is handled with an except block, but within that except block a Broken Pip error is raised.

File "/env/lib/python3.7/site-packages/pg8000/core.py", line 1778, in execute ps = cache['ps'][key] KeyError: ('SELECT voice_comments.id AS voice_comments_id, voice_comments.store_id AS voice_comments_store_id, voice_comments.comment_date AS voice_comments_comment_date, voice_comments.survey_item AS voice_comments_survey_item, voice_comments.comment_text AS voice_comments_comment_text, voice_comments.overall_satisfaction AS voice_comments_overall_satisfaction, voice_comments.visit_date AS voice_comments_visit_date \nFROM voice_comments \nWHERE voice_comments.store_id = %s AND voice_comments.comment_date >= %s AND voice_comments.comment_date <= %s AND voice_comments.overall_satisfaction = %s AND voice_comments.id NOT IN (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', ((705, 0, <function Connection.__init__.<locals>.text_out at 0x3eb1a3fb7c20>), (1114, 1, <function timestamp_send_integer at 0x3eb1a4a6d320>), (1114, 1, <function timestamp_send_integer at 0x3eb1a4a6d320>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>), (21, 1, <built-in method pack of Struct object at 0x3eb1a4a41030>)))

In the except block that handle the above KeyError, a BrokenPipeError is raised, which is this:

Traceback (most recent call last): File "/env/lib/python3.7/site-packages/flask/app.py", line 2292, in wsgi_app response = self.full_dispatch_request() File "/env/lib/python3.7/site-packages/flask/app.py", line 1815, in full_dispatch_request rv = self.handle_user_exception(e) File "/env/lib/python3.7/site-packages/flask/app.py", line 1718, in handle_user_exception reraise(exc_type, exc_value, tb) File "/env/lib/python3.7/site-packages/flask/_compat.py", line 35, in reraise raise value File "/env/lib/python3.7/site-packages/flask/app.py", line 1813, in full_dispatch_request rv = self.dispatch_request() File "/env/lib/python3.7/site-packages/flask/app.py", line 1799, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/srv/routes/voice.py", line 26, in voice_comments result = get_voice_comments(store_id, start_date, end_date, ids_to_filter_out=used_comment_ids) File "/srv/routes/pgsql_api.py", line 18, in get_voice_comments voice_comments.id.notin_(ids_to_filter_out)).all() File "/env/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3233, in all return list(self) File "/env/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3389, in __iter__ return self._execute_and_instances(context) File "/env/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3414, in _execute_and_instances result = conn.execute(querycontext.statement, self._params) File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute return meth(self, multiparams, params) File "/env/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement distilled_params, File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context e, statement, parameters, cursor, context File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1478, in _handle_dbapi_exception util.reraise(*exc_info) File "/env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise raise value File "/env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context cursor, statement, parameters, context File "/env/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute cursor.execute(statement, parameters) File "/env/lib/python3.7/site-packages/pg8000/core.py", line 861, in execute self._c.execute(self, operation, args) File "/env/lib/python3.7/site-packages/pg8000/core.py", line 1830, in execute self._flush() File "/opt/python3.7/lib/python3.7/socket.py", line 607, in write return self._sock.send(b) BrokenPipeError: [Errno 32] Broken pipe

I am thinking about this in 3 parts:

1. I recently switch from MySQL to PostgresSQL. With MySQL, I did not experience Broken Pipe errors..or at least they were handled gracefully behind the scenes without me knowing. I am worried that I am doing something dumb at a high level..i.e. I should set different pool_recycle or connect_timeout values when creating the engine with SQLAlchemy. I am using default values for all arguments except for pool_timeout=30, pool_recycle=1800

Since this happens after about an hour of the app running, I am wondering if connection are being disconnected (by the pg db) but SQLAlchemy is still trying to use them?

2. Should I be concerned about the KeyError at all? Or is this an expected path in the code. I am leaning towards it being an expected since it is caught by the except block etc...

3. Is the BrokenPipeError something I should handle at the query level? i.e. something like:

retry = 0
while retry<3:
try:
    session.query(....
    break
except BrokenPipeError:
    retry += 1
    continue
BrainPermafrost
  • 644
  • 2
  • 7
  • 20
  • I have exactly the same error.. Good to know I am not alone. – Skyy2010 Feb 24 '20 at 12:49
  • I was thinking switching away from pg8000. It is the library that is used in gcp examples, but on the SQLAlchemy page they say that they don't recomment pg8000 because they don't test it – Skyy2010 Feb 24 '20 at 12:53

2 Answers2

0

Probably you are closing the pipe while executing the query. According to this other stackoverflow thread about How to prevent BrokenPipeError this is a common issue. Here you have a brief summary:

The BrokenPipeError is normal as said phantom because the reading process (head) terminates and closes its end of the pipe while the writing process (python) still tries to write.

iker lasaga
  • 330
  • 1
  • 3
  • 18
0

I wasn't properly scoping the sqlalchemy session as described here:

https://flask.palletsprojects.com/en/1.1.x/patterns/sqlalchemy/

Specifically, I was using scoped_session, but I wasn't removing the session with the @app.teardown_context decorator.

I had been doing this with the mysql db, but just simply forgot to do this when switch to pgsql.

Fixing this corrected the problem.

BrainPermafrost
  • 644
  • 2
  • 7
  • 20