1

I'm running a vanilla AWS lambda function to count the number of messages in my RabbitMQ task queue:

import boto3
from botocore.vendored import requests


cloudwatch_client = boto3.client('cloudwatch')


def get_queue_count(user="user", password="password", domain="<my domain>/api/queues"):
    url = f"https://{user}:{password}@{domain}"
    res = requests.get(url)
    message_count = 0
    for queue in res.json():
        message_count += queue["messages"]
    return message_count


def lambda_handler(event, context):
    metric_data = [{'MetricName': 'RabbitMQQueueLength', "Unit": "None", 'Value': get_queue_count()}]
    print(metric_data)
    response = cloudwatch_client.put_metric_data(MetricData=metric_data, Namespace="RabbitMQ")
    print(response)

Which returns the following output on a test run:

Response:
{
  "errorMessage": "2020-06-30T19:50:50.175Z d3945a14-82e5-42e5-b03d-3fc07d5c5148 Task timed out after 15.02 seconds"
}

Request ID:
"d3945a14-82e5-42e5-b03d-3fc07d5c5148"

Function logs:
START RequestId: d3945a14-82e5-42e5-b03d-3fc07d5c5148 Version: $LATEST
/var/runtime/botocore/vendored/requests/api.py:72: DeprecationWarning: You are using the get() function from 'botocore.vendored.requests'.  This dependency was removed from Botocore and will be removed from Lambda after 2021/01/30. https://aws.amazon.com/blogs/developer/removing-the-vendored-version-of-requests-from-botocore/. Install the requests package, 'import requests' directly, and use the requests.get() function instead.
  DeprecationWarning
[{'MetricName': 'RabbitMQQueueLength', 'Value': 295}]
END RequestId: d3945a14-82e5-42e5-b03d-3fc07d5c5148

You can see that I'm able to interact with the RabbitMQ API just fine--the function hangs when trying to post the metric.

The lambda function uses the IAM role put-custom-metric, which uses the policies recommended here, as well as CloudWatchFullAccess for good measure.

Resources on my internal load balancer, where my RabbitMQ server lives, are protected by a VPN, so it's necessary for me to associate this function with the proper VPC/security group. Here's how it's setup right now (I know this is working, because otherwise the communication with RabbitMQ would fail): enter image description here I read this post where multiple contributors suggest increasing the function memory and timeout settings. I've done both of these, and the timeout persists.

I can run this locally without any issue to create the metric on CloudWatch in less than 5 seconds.

aaron
  • 6,339
  • 12
  • 54
  • 80
  • 1
    Is there a reason to use a Lambda to export RMQ metrics to CloudWatch? You could simply use the [plugin](https://github.com/noxdafox/rabbitmq-cloudwatch-exporter) for that or simply run a cron job on the RMQ node. – noxdafox Jul 01 '20 at 09:01
  • @noxdafox ah, this is cool. So instead of hacking things together AWS-side, you're saying that I can place AWS credentials on my RMQ server and write directly to cloudwatch? This definitely seems like the cleaner solution. I've been making some progress with a dedicated VPC endpoint, but I might reverse course. My app is dockerized and I use `rabbitmq:3-management-alpine` for RMQ. Any tips on implementing your approach in docker? My first thought would be to write a custom Dockerfile that uses the alpine image and then does the rest of the config. – aaron Jul 01 '20 at 15:31

2 Answers2

2

@noxdafox has written a brilliant plugin that got me most of the way there, but at the end of the day I ended going for a pure lambda-based solution. It was surprisingly tricky getting the cloud watch plugin running with docker, and after I had trouble with the container shutting down its services and stopping processing of the message queue. Additionally, I wanted to be able to normalize queue count by the number of worker services in my ECS cluster, so I was going to need to connect to at least one AWS resource from within my VPC anyhow. I figured best to keep everything simple and in the same place.

import os
import boto3
from botocore.vendored import requests

USER = os.getenv("RMQ_USER")
PASSWORD = os.getenv("RMQ_PASSWORD")

cloudwatch_client = boto3.client(service_name='cloudwatch', endpoint_url="https://MYCLOUDWATCHURL.monitoring.us-east-1.vpce.amazonaws.com")
ecs_client = boto3.client(service_name='ecs', endpoint_url="https://vpce-MYECSURL.ecs.us-east-1.vpce.amazonaws.com")


def get_message_count(user=USER, password=PASSWORD, domain="rabbitmq.stockbets.io/api/queues"):
    url = f"https://{user}:{password}@{domain}"
    res = requests.get(url)
    message_count = 0
    for queue in res.json():
        message_count += queue["messages"]
    print(f"message count: {message_count}")
    return message_count


def get_worker_count():
    worker_data = ecs_client.describe_services(cluster="prod", services=["worker"])
    worker_count = worker_data["services"][0]["runningCount"]
    print(f"worker_count count: {worker_count}")
    return worker_count


def lambda_handler(event, context):
    message_count = get_message_count()
    worker_count = get_worker_count()
    print(f"msgs per worker: {message_count / worker_count}")
    metric_data = [
        {'MetricName': 'MessagesPerWorker', "Unit": "Count", 'Value': message_count / worker_count},
        {'MetricName': 'NTasks', "Unit": "Count", 'Value': worker_count}
    ]
    cloudwatch_client.put_metric_data(MetricData=metric_data, Namespace="RabbitMQ")

Creating the VPC endpoints was easier that I thought it would be. For Cloudwatch, you want to search for the "monitoring" VPC endpoint during the creation step (not "cloudwatch" or "logs". Searching for "ecs" gets you what you need for the ECS connect.

Once your lambda is us you need to configure the metric and accompanying alerts, and then relate those to an auto-scaling policy, but that's probably beyond the scope of this post. Leave a comment if you have questions on how I worked that out.

aaron
  • 6,339
  • 12
  • 54
  • 80
1

Only reason you might want to use a Lambda function to achieve your goal is if you do not own the RabbitMQ cluster. The fact your logic is hanging during communication suggests a network issue mostly due to mis-configured security groups.

If you can change the cluster configuration, I'd suggest you to install and configure the CloudWatch metrics exporter plugin which does most of the heavy-lifting work for you.

If your cluster runs on Docker, I believe the custom Docker file to be the best solution. If you run your Docker instances in AWS via ECS/Fargate, the plugin should be able to automatically infer the credentials from the Task Role through ExAws. Otherwise, just follow the README instructions on how to set the credentials yourself.

noxdafox
  • 14,439
  • 4
  • 33
  • 45
  • this looks like the right answer. I'm out of town at the moment, but when I'm back on Monday I'll give this a go and send you a well-deserved upvote assuming that this all works out. – aaron Jul 04 '20 at 15:51
  • the issue is that lambda functions connected to your VPC lose access to AWS cloud resources, including S3. the workaround seems to be a VPC endpoint, but your solution is 100% the better. one – aaron Jul 04 '20 at 15:52
  • Still not up and live in prod, but I'm running local tests now with a container that has your plugin activated and it all looks good. Getting this going with Docker was a bit of a pain -- I tried a from-source install during the build, which ran into varios erlang dependency issues -- before just copying and pasting links to all the .es files on your latest release (which seems to be what you recommend). Sent you a PR here with my approach, in case that's helpful to others: https://github.com/noxdafox/rabbitmq-cloudwatch-exporter/compare/master...aaronpolhamus:patch-1 – aaron Jul 06 '20 at 19:11