0

I'm attempting to integrate with a third party that is posting messages on an Amazon SQS queue. I need my GAE backend to receive these messages.

Essentially, I want the following script to launch and always be running

import boto3
sqs_client = boto3.client('sqs', 
    aws_access_key_id=KEY, 
    aws_secret_access_key=SECRET, 
    region_name=REGION)
while True:
    sqs_client.receive_message(QueueUrl=QUEUE_URL, WaitTimeSeconds=60)
    for message in msgs_response.get('Messages', []):
        deferred.defer(process_and_delete_message, message)

My main appengine web app is on Automatic Scaling (with the 60-second &10-minute task timeouts), but I'm thinking of setting up a micro-service set to either Manual Scaling or Basic Scaling because:

Requests can run indefinitely. A manually-scaled instance can choose to handle /_ah/start and execute a program or script for many hours without returning an HTTP response code. Task queue tasks can run up to 24 hours.

https://cloud.google.com/appengine/docs/standard/python/an-overview-of-app-engine

Apparently both Manual & Basic Scaling also allow "Background Threads", but I am having a hard-time finding documentation for it and I'm thinking this may be a relic from the days before they deprecated Backends in favor of Modules (although I did find this https://cloud.google.com/appengine/docs/standard/python/refdocs/modules/google/appengine/api/background_thread/background_thread#BackgroundThread).

Is Manual or Basic Scaling suited for this? If so, what should I use to listen on sqs_client.receive_message()? One thing I'm concerned about is this task/background thread dieing and not relaunching itself.

Alex
  • 5,141
  • 12
  • 26

2 Answers2

0

This maybe a possible solution:

Try to use a Google Compute Engine micro instance to run that script continuously and send a REST call to your app engine app. Easy Python Example For Compute Engine

OR:

I have used modules that run instance type B2/B1 for long running jobs; and I have never had any trouble; but those jobs do start and stop. I use the basic scaling: with max_instances set to 1. The jobs I have run take around 6 hours to complete.

bscott
  • 532
  • 4
  • 15
0

I ended up creating a manual scaling app engine standard micro-service for this. This micro-service has handeler for /_ah/start never returns and runs indefinitely (many days at a time) and when it does get stopped, then app engine restarts it immediately.

Requests can run indefinitely. A manually-scaled instance can choose to handle /_ah/start and execute a program or script for many hours without returning an HTTP response code. Task queue tasks can run up to 24 hours.

https://cloud.google.com/appengine/docs/standard/python/an-overview-of-app-engine

My /_ah/start handler listens to the SQS queue, and creates Push Queue tasks that my default service is set up to listen for.

I was looking into the Compute Engine route as well as the App Engine Flex route (which is essentially Compute Engine managed by app engine), but there were other complexities like not getting access to ndb and the taskqueue sdk and I didn't have time to dive into that.

Below are all of the files for this micro-service, not included is my lib folder that contains the source code for boto3 & some other libraries I needed.

I hope this helpful for someone.

gaesqs.yaml:

application: my-project-id
module: gaesqs
version: dev
runtime: python27
api_version: 1
threadsafe: true

manual_scaling:
  instances: 1

env_variables:
  theme: 'default'
  GAE_USE_SOCKETS_HTTPLIB : 'true'
builtins:
- appstats: on #/_ah/stats/
- remote_api: on #/_ah/remote_api/
- deferred: on

handlers:
- url: /.*
  script: gaesqs_main.app


libraries:
- name: jinja2
  version: "2.6"
- name: webapp2
  version: "2.5.2"
- name: markupsafe
  version: "0.15"
- name: ssl
  version: "2.7.11"
- name: pycrypto
  version: "2.6"
- name: lxml
  version: latest

gaesqs_main.py:

#!/usr/bin/env python
import json

import logging

import appengine_config

try:
    # This is needed to make local development work with SSL.
    # See http://stackoverflow.com/a/24066819/500584
    # and https://code.google.com/p/googleappengine/issues/detail?id=9246 for more information.
    from google.appengine.tools.devappserver2.python import sandbox
    sandbox._WHITE_LIST_C_MODULES += ['_ssl', '_socket']

    import sys
    # this is socket.py copied from a standard python install
    from lib import stdlib_socket
    socket = sys.modules['socket'] = stdlib_socket
except ImportError:
    pass


import boto3
import os

import webapp2
from webapp2_extras.routes import RedirectRoute
from google.appengine.api import taskqueue

app = webapp2.WSGIApplication(debug=os.environ['SERVER_SOFTWARE'].startswith('Dev'))#, config=webapp2_config)


KEY = "<MY-KEY>"
SECRET = "<MY-SECRET>"
REGION = "<MY-REGION>"
QUEUE_URL = "<MY-QUEUE_URL>"


def process_message(message_body):
    queue = taskqueue.Queue('default')
    task = taskqueue.Task(
        url='/task/sqs-process/',
        countdown=0,
        target='default',
        params={'message': message_body})
    queue.add(task)


class Start(webapp2.RequestHandler):

    def get(self):
        logging.info("Start")
        for loggers_to_suppress in ['boto3', 'botocore', 'nose', 's3transfer']:
            logger = logging.getLogger(loggers_to_suppress)
            if logger:
                logger.setLevel(logging.WARNING)
        logging.info("boto3 loggers suppressed")
        sqs_client = boto3.client('sqs',
                                  aws_access_key_id=KEY,
                                  aws_secret_access_key=SECRET,
                                  region_name=REGION)
        while True:
            msgs_response = sqs_client.receive_message(QueueUrl=QUEUE_URL, WaitTimeSeconds=20)
            logging.info("msgs_response: %s" % msgs_response)
            for message in msgs_response.get('Messages', []):
                logging.info("message: %s" % message)
                process_message(message['Body'])
                sqs_client.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=message['ReceiptHandle'])


_routes = [
    RedirectRoute('/_ah/start', Start, name='start'),
]

for r in _routes:
    app.router.add(r)

appengine_config.py:

import os

from google.appengine.ext import vendor
from google.appengine.ext.appstats import recording

appstats_CALC_RPC_COSTS = True

# Add any libraries installed in the "lib" folder.
#     Use pip with the -t lib flag to install libraries in this directory:
#     $ pip install -t lib gcloud
#     https://cloud.google.com/appengine/docs/python/tools/libraries27
try:
    vendor.add('lib')
except:
    print "Unable to add 'lib'"


def webapp_add_wsgi_middleware(app):
    app = recording.appstats_wsgi_middleware(app)
    return app

if os.environ.get('SERVER_SOFTWARE', '').startswith('Development'):
    print "gaesqs development"
    import imp
    import os.path
    import inspect
    from google.appengine.tools.devappserver2.python import sandbox

    sandbox._WHITE_LIST_C_MODULES += ['_ssl', '_socket']
    # Use the system socket.

    real_os_src_path = os.path.realpath(inspect.getsourcefile(os))
    psocket = os.path.join(os.path.dirname(real_os_src_path), 'socket.py')
    imp.load_source('socket', psocket)
    os.environ['HTTP_HOST'] = "my-project-id.appspot.com"
else:
    print "gaesqs prod"
    # Doing this on dev_appserver/localhost seems to cause outbound https requests to fail
    from lib import requests
    from lib.requests_toolbelt.adapters import appengine as requests_toolbelt_appengine

    # Use the App Engine Requests adapter. This makes sure that Requests uses
    # URLFetch.
    requests_toolbelt_appengine.monkeypatch()
Alex
  • 5,141
  • 12
  • 26
  • How did you manage to get boto3 working on app engine ? I see an error related to Popen when I use boto3. Thanks. – Varunkumar Manohar Jan 01 '19 at 04:15
  • i dont recall having that issue. maybe post your stack trace in a separate question? but i'm pretty sure `popen` is not allowed on app engine standard, so maybe you're using different functionality than I did. – Alex Jan 01 '19 at 23:09