Skip to main content

Finding misconfigured or dangling CloudWatch Alarms

At work, we use a lot of CloudWatch Alarms. This allows you to monitor metrics from a variety of places, and take action when a metric hits some particular threshold. For example:

Earlier this week, I was looking through the CloudWatch console, and I found a bunch of alarms that were pulling metrics from non-existent sources. It looks like we created a resource, created some CloudWatch alarms based on it, but when we deleted the resource the alarms stuck around. Those alarms will never do anything, because the metrics they’re based on will never change.

I wrote a Python script to find these dangling alarms, which you can copy/paste or download below.

After I ran my script, I was able to delete several hundred old alarms, three-quarters of the alarms in our account. I was doing it to make it easier to review the alarms we are using, rather than to save money – but a small dent in the bill never goes amiss.

{% highlight python %} #!/usr/bin/env python3 “”” Finds CloudWatch metric alarms that are pulling from a non-existent source, e.g. an alarm that’s based on the size of a now-deleted queue.

This script doesn’t delete the alarm, it just warns you that the alarm is never going to change in its current configuration. You might want to delete the alarm, or point it at a different source.

I’ve implemented checks for the Alarm sources that I use most often, but CloudWatch Alarms can pull from other sources. The script should be extensible to other sources/namespaces if that’s useful to you.

From https://alexwlchan.net/2021/08/finding-misconfigured-or-dangling-cloudwatch-alarms/

”””

import functools

import boto3 from botocore.exceptions import ClientError

def get_metric_alarm_descriptions(sess): resource = sess.resource(“cloudwatch”) client = sess.client(“cloudwatch”)

for alarm in resource.alarms.all():
    resp = client.describe_alarms(AlarmNames=[alarm.name])
    yield from resp["MetricAlarms"]

Note: often you’ll have more than one alarm that reads from

a given source (e.g. one alarm for table usage high and another

for table usage low).

#

There’s no point doing repeated, successive checks for the existence

of a table/queue/function/whatever, so cache the results here.

@functools.cache def dynamodb_table_exists(sess, *, table_name): “”” Returns True if this DynamoDB table exists, False otherwise. “”” client = sess.client(“dynamodb”)

try:
    client.describe_table(TableName=table_name)
    return True
except ClientError as err:  # pragma: no cover
    if err.response["Error"]["Code"] == "ResourceNotFoundException":
        return False
    else:
        raise

@functools.cache def sqs_queue_exists(sess, *, queue_name): “”” Returns True if this SQS queue exists, False otherwise. “”” client = sess.client(“sqs”)

try:
    client.get_queue_url(QueueName=queue_name)
    return True
except ClientError as err:  # pragma: no cover
    if err.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":
        return False
    else:
        raise

@functools.cache def lambda_function_exists(sess, *, function_name): “”” Returns True if this Lambda function exists, False otherwise. “”” client = sess.client(“lambda”)

try:
    client.get_function(FunctionName=function_name)
    return True
except ClientError as err:  # pragma: no cover
    if err.response["Error"]["Code"] == "ResourceNotFoundException":
        return False
    else:
        raise

if name == “main”: sess = boto3.Session()

for alarm in get_metric_alarm_descriptions(sess):
    dimensions = {dim["Name"]: dim["Value"] for dim in alarm["Dimensions"]}

    # Is this alarm based on a non-existent table?
    if alarm.get("Namespace") == "AWS/DynamoDB":
        table_name = dimensions["TableName"]
        if not dynamodb_table_exists(sess, table_name=table_name):
            print(
                f"!!! Alarm {alarm['AlarmArn']} is based on non-existent table {table_name}"
            )
        continue

    # Is this alarm based on a non-existent queue?
    if alarm.get("Namespace") == "AWS/SQS":
        queue_name = dimensions["QueueName"]
        if not sqs_queue_exists(sess, queue_name=queue_name):
            print(
                f"!!! Alarm {alarm['AlarmArn']} is based on non-existent SQS queue {queue_name}"
            )
        continue

    # Is this alarm based on a non-existent Lambda function?
    if alarm.get("Namespace") == "AWS/Lambda":
        function_name = dimensions["FunctionName"]
        if not lambda_function_exists(sess, function_name=function_name):
            print(
                f"!!! Alarm {alarm['AlarmArn']} is based on non-existent Lambda function {function_name}"
            )
        continue {% endhighlight %}