I ended up creating a manual scaling app engine standard micro-service for this. This micro-service has handeler for /_ah/start
never returns and runs indefinitely (many days at a time) and when it does get stopped, then app engine restarts it immediately.
Requests can run indefinitely. A manually-scaled instance can choose
to handle /_ah/start and execute a program or script for many hours
without returning an HTTP response code. Task queue tasks can run up
to 24 hours.
https://cloud.google.com/appengine/docs/standard/python/an-overview-of-app-engine
My /_ah/start
handler listens to the SQS queue, and creates Push Queue tasks that my default service is set up to listen for.
I was looking into the Compute Engine route as well as the App Engine Flex route (which is essentially Compute Engine managed by app engine), but there were other complexities like not getting access to ndb
and the taskqueue
sdk and I didn't have time to dive into that.
Below are all of the files for this micro-service, not included is my lib
folder that contains the source code for boto3 & some other libraries I needed.
I hope this helpful for someone.
gaesqs.yaml:
application: my-project-id
module: gaesqs
version: dev
runtime: python27
api_version: 1
threadsafe: true
manual_scaling:
instances: 1
env_variables:
theme: 'default'
GAE_USE_SOCKETS_HTTPLIB : 'true'
builtins:
- appstats: on #/_ah/stats/
- remote_api: on #/_ah/remote_api/
- deferred: on
handlers:
- url: /.*
script: gaesqs_main.app
libraries:
- name: jinja2
version: "2.6"
- name: webapp2
version: "2.5.2"
- name: markupsafe
version: "0.15"
- name: ssl
version: "2.7.11"
- name: pycrypto
version: "2.6"
- name: lxml
version: latest
gaesqs_main.py:
#!/usr/bin/env python
import json
import logging
import appengine_config
try:
# This is needed to make local development work with SSL.
# See http://stackoverflow.com/a/24066819/500584
# and https://code.google.com/p/googleappengine/issues/detail?id=9246 for more information.
from google.appengine.tools.devappserver2.python import sandbox
sandbox._WHITE_LIST_C_MODULES += ['_ssl', '_socket']
import sys
# this is socket.py copied from a standard python install
from lib import stdlib_socket
socket = sys.modules['socket'] = stdlib_socket
except ImportError:
pass
import boto3
import os
import webapp2
from webapp2_extras.routes import RedirectRoute
from google.appengine.api import taskqueue
app = webapp2.WSGIApplication(debug=os.environ['SERVER_SOFTWARE'].startswith('Dev'))#, config=webapp2_config)
KEY = "<MY-KEY>"
SECRET = "<MY-SECRET>"
REGION = "<MY-REGION>"
QUEUE_URL = "<MY-QUEUE_URL>"
def process_message(message_body):
queue = taskqueue.Queue('default')
task = taskqueue.Task(
url='/task/sqs-process/',
countdown=0,
target='default',
params={'message': message_body})
queue.add(task)
class Start(webapp2.RequestHandler):
def get(self):
logging.info("Start")
for loggers_to_suppress in ['boto3', 'botocore', 'nose', 's3transfer']:
logger = logging.getLogger(loggers_to_suppress)
if logger:
logger.setLevel(logging.WARNING)
logging.info("boto3 loggers suppressed")
sqs_client = boto3.client('sqs',
aws_access_key_id=KEY,
aws_secret_access_key=SECRET,
region_name=REGION)
while True:
msgs_response = sqs_client.receive_message(QueueUrl=QUEUE_URL, WaitTimeSeconds=20)
logging.info("msgs_response: %s" % msgs_response)
for message in msgs_response.get('Messages', []):
logging.info("message: %s" % message)
process_message(message['Body'])
sqs_client.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=message['ReceiptHandle'])
_routes = [
RedirectRoute('/_ah/start', Start, name='start'),
]
for r in _routes:
app.router.add(r)
appengine_config.py:
import os
from google.appengine.ext import vendor
from google.appengine.ext.appstats import recording
appstats_CALC_RPC_COSTS = True
# Add any libraries installed in the "lib" folder.
# Use pip with the -t lib flag to install libraries in this directory:
# $ pip install -t lib gcloud
# https://cloud.google.com/appengine/docs/python/tools/libraries27
try:
vendor.add('lib')
except:
print "Unable to add 'lib'"
def webapp_add_wsgi_middleware(app):
app = recording.appstats_wsgi_middleware(app)
return app
if os.environ.get('SERVER_SOFTWARE', '').startswith('Development'):
print "gaesqs development"
import imp
import os.path
import inspect
from google.appengine.tools.devappserver2.python import sandbox
sandbox._WHITE_LIST_C_MODULES += ['_ssl', '_socket']
# Use the system socket.
real_os_src_path = os.path.realpath(inspect.getsourcefile(os))
psocket = os.path.join(os.path.dirname(real_os_src_path), 'socket.py')
imp.load_source('socket', psocket)
os.environ['HTTP_HOST'] = "my-project-id.appspot.com"
else:
print "gaesqs prod"
# Doing this on dev_appserver/localhost seems to cause outbound https requests to fail
from lib import requests
from lib.requests_toolbelt.adapters import appengine as requests_toolbelt_appengine
# Use the App Engine Requests adapter. This makes sure that Requests uses
# URLFetch.
requests_toolbelt_appengine.monkeypatch()